From ddc7624e5fc9bfc69171db8bc5b4adb2bff6991b Mon Sep 17 00:00:00 2001 From: Andrew Pope Date: Wed, 27 Jan 2021 16:17:22 -0700 Subject: [PATCH 01/36] sensor now fails on cancelled runs --- dbt_cloud_plugin/sensors/dbt_cloud_job_sensor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dbt_cloud_plugin/sensors/dbt_cloud_job_sensor.py b/dbt_cloud_plugin/sensors/dbt_cloud_job_sensor.py index 4c5c376..9a3ab6a 100644 --- a/dbt_cloud_plugin/sensors/dbt_cloud_job_sensor.py +++ b/dbt_cloud_plugin/sensors/dbt_cloud_job_sensor.py @@ -38,11 +38,11 @@ def poke(self, context): self.log.info('State of Run ID {}: {}'.format(self.run_id, run_status)) TERMINAL_RUN_STATES = ['Success', 'Error', 'Cancelled'] - FAILED_RUN_STATES = ['Error'] + FAILED_RUN_STATES = ['Error', 'Cancelled'] - if run_status in FAILED_RUN_STATES: - return AirflowException('dbt cloud Run ID {} Failed.'.format(self.run_id)) - if run_status in TERMINAL_RUN_STATES: + if run_status.strip() in FAILED_RUN_STATES: + raise AirflowException('dbt cloud Run ID {} Failed.'.format(self.run_id)) + if run_status.strip() in TERMINAL_RUN_STATES: return True else: return False From fab9d7a4ff5afb24dae020e5c0ab4f2b7b303807 Mon Sep 17 00:00:00 2001 From: Andrew Pope Date: Wed, 27 Jan 2021 16:49:29 -0700 Subject: [PATCH 02/36] updates so this can be installed as a package --- dbt_cloud_plugin/__init__.py | 10 ---------- dbt_cloud_plugin/hooks/dbt_cloud_hook.py | 2 +- .../operators/dbt_cloud_run_job_operator.py | 2 +- .../sensors/dbt_cloud_job_sensor.py | 2 +- setup.py | 18 ++++++++++++++++++ 5 files changed, 21 insertions(+), 13 deletions(-) create mode 100644 setup.py diff --git a/dbt_cloud_plugin/__init__.py b/dbt_cloud_plugin/__init__.py index 085d692..e69de29 100644 --- a/dbt_cloud_plugin/__init__.py +++ b/dbt_cloud_plugin/__init__.py @@ -1,10 +0,0 @@ -from airflow.plugins_manager import AirflowPlugin -from dbt_cloud_plugin.hooks.dbt_cloud_hook import DbtCloudHook -from dbt_cloud_plugin.operators.dbt_cloud_run_job_operator import DbtCloudRunJobOperator -from dbt_cloud_plugin.sensors.dbt_cloud_run_sensor import DbtCloudRunSensor - -class DbtCloudPlugin(AirflowPlugin): - name = "dbt_cloud_plugin" - operators = [DbtCloudRunJobOperator] - hooks = [DbtCloudHook] - sensors = [DbtCloudRunSensor] diff --git a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py index f2f4980..574d8f7 100644 --- a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py +++ b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py @@ -1,4 +1,4 @@ -from dbt_cloud_plugin.dbt_cloud.dbt_cloud import DbtCloud +from ..dbt_cloud.dbt_cloud import DbtCloud from airflow.hooks.base_hook import BaseHook from airflow.exceptions import AirflowException diff --git a/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py index 17b8faa..ac66f66 100644 --- a/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py +++ b/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py @@ -4,9 +4,9 @@ import time from airflow.models import BaseOperator -from dbt_cloud_plugin.hooks.dbt_cloud_hook import DbtCloudHook from airflow.utils.decorators import apply_defaults from airflow.exceptions import AirflowException +from ..hooks.dbt_cloud_hook import DbtCloudHook class DbtCloudRunJobOperator(BaseOperator): """ diff --git a/dbt_cloud_plugin/sensors/dbt_cloud_job_sensor.py b/dbt_cloud_plugin/sensors/dbt_cloud_job_sensor.py index 9a3ab6a..6ab907d 100644 --- a/dbt_cloud_plugin/sensors/dbt_cloud_job_sensor.py +++ b/dbt_cloud_plugin/sensors/dbt_cloud_job_sensor.py @@ -1,4 +1,4 @@ -from dbt_cloud_plugin.hooks.dbt_cloud_hook import DbtCloudHook +from ..hooks.dbt_cloud_hook import DbtCloudHook from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults from airflow.exceptions import AirflowException diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..cf2d7e2 --- /dev/null +++ b/setup.py @@ -0,0 +1,18 @@ + +from distutils.core import setup + +setup( + name='dbt_cloud_plugin', + version='0.1', + packages=[ + 'dbt_cloud_plugin', + 'dbt_cloud_plugin.dbt_cloud', + 'dbt_cloud_plugin.hooks', + 'dbt_cloud_plugin.operators', + 'dbt_cloud_plugin.sensors' + ], + install_requires=[ + 'apache-airflow' + ] + +) From bed6444bc7310f1ce6b03949e9b8ee4c39e8e96e Mon Sep 17 00:00:00 2001 From: Andrew Pope Date: Thu, 28 Jan 2021 09:15:27 -0700 Subject: [PATCH 03/36] updates to send optional params to run job --- dbt_cloud_plugin/hooks/dbt_cloud_hook.py | 30 +++++++++++++++++++ .../operators/dbt_cloud_run_job_operator.py | 15 ++++++++-- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py index 574d8f7..ad655e0 100644 --- a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py +++ b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py @@ -44,6 +44,18 @@ def get_conn(self): return DbtCloud(dbt_cloud_account_id, dbt_cloud_api_token) + def _get_conn_extra(self): + conn = self.get_connection(self.dbt_cloud_conn_id).extra_dejson + config = {} + if 'git_branch' in conn: + config['git_branch'] = conn['git_branch'] + if 'schema_override' in conn: + config['schema_override'] = conn['schema_override'] + if 'target_name_override' in conn: + config['target_name_override'] = conn['target_name_override'] + + return config + def get_run_status(self, run_id): """ Return the status of an dbt cloud run. @@ -53,3 +65,21 @@ def get_run_status(self, run_id): run = dbt_cloud.try_get_run(run_id=run_id) status_name = RunStatus.lookup(run['status']) return status_name + + def run_job(self, job_name, git_branch=None, schema_override=None, + target_name_override=None): + dbt_cloud = self.get_conn() + extra = self._get_conn_extra() + + data = {'cause': 'Kicked off via Airflow'} + # add optional settings + if git_branch or extra.get('git_branch', None): + data['git_branch'] = git_branch or extra.get('git_branch', None) + if schema_override or extra.get('schema_override', None): + data['schema_override'] = schema_override or extra.get('schema_override', None) + if target_name_override or extra.get('target_name_override', None): + data['target_name_override'] = target_name_override or extra.get('target_name_override', None) + + self.log.info(f'Triggering job {job_name} with data {data}') + + return dbt_cloud.run_job(job_name, data=data) diff --git a/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py index ac66f66..6c7021e 100644 --- a/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py +++ b/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py @@ -23,6 +23,9 @@ class DbtCloudRunJobOperator(BaseOperator): def __init__(self, dbt_cloud_conn_id=None, job_name=None, + git_branch=None, + schema_override=None, + target_name_override=None, *args, **kwargs): super(DbtCloudRunJobOperator, self).__init__(*args, **kwargs) @@ -34,6 +37,9 @@ def __init__(self, self.dbt_cloud_conn_id = dbt_cloud_conn_id self.job_name = job_name + self.git_branch = git_branch + self.schema_override = schema_override + self.target_name_override = target_name_override def execute(self, **kwargs): @@ -41,9 +47,12 @@ def execute(self, **kwargs): try: dbt_cloud_hook = DbtCloudHook(dbt_cloud_conn_id=self.dbt_cloud_conn_id) - dbt_cloud = dbt_cloud_hook.get_conn() - data = {'cause':'Kicked off via Airflow'} - trigger_resp = dbt_cloud.run_job(self.job_name, data=data) + trigger_resp = dbt_cloud_hook.run_job( + self.job_name, + git_branch=self.git_branch, + schema_override=self.schema_override, + target_name_override=self.target_name_override + ) self.log.info('Triggered Run ID {}'.format(trigger_resp['id'])) except RuntimeError as e: raise AirflowException("Error while triggering job {}: {}".format(self.job_name, e)) From 510ade5928357969416f7ede4c056ff6633396cf Mon Sep 17 00:00:00 2001 From: Andrew Pope Date: Thu, 28 Jan 2021 13:42:25 -0700 Subject: [PATCH 04/36] operator now accepts steps_override parameter --- dbt_cloud_plugin/dbt_cloud/dbt_cloud.py | 2 +- dbt_cloud_plugin/hooks/dbt_cloud_hook.py | 4 +++- dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py | 5 ++++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py b/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py index 7b0afdb..aee6b34 100644 --- a/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py +++ b/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py @@ -30,7 +30,7 @@ def _get(self, url_suffix): def _post(self, url_suffix, data=None): url = self.api_base + url_suffix headers = {'Authorization': 'token %s' % self.api_token} - response = requests.post(url, headers=headers, data=data) + response = requests.post(url, headers=headers, json=data) if response.status_code == 200: return json.loads(response.content) else: diff --git a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py index ad655e0..7a912fc 100644 --- a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py +++ b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py @@ -67,7 +67,7 @@ def get_run_status(self, run_id): return status_name def run_job(self, job_name, git_branch=None, schema_override=None, - target_name_override=None): + target_name_override=None, steps_override=None): dbt_cloud = self.get_conn() extra = self._get_conn_extra() @@ -79,6 +79,8 @@ def run_job(self, job_name, git_branch=None, schema_override=None, data['schema_override'] = schema_override or extra.get('schema_override', None) if target_name_override or extra.get('target_name_override', None): data['target_name_override'] = target_name_override or extra.get('target_name_override', None) + if steps_override: + data['steps_override'] = steps_override self.log.info(f'Triggering job {job_name} with data {data}') diff --git a/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py index 6c7021e..ad2d6d3 100644 --- a/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py +++ b/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py @@ -26,6 +26,7 @@ def __init__(self, git_branch=None, schema_override=None, target_name_override=None, + steps_override=None, *args, **kwargs): super(DbtCloudRunJobOperator, self).__init__(*args, **kwargs) @@ -40,6 +41,7 @@ def __init__(self, self.git_branch = git_branch self.schema_override = schema_override self.target_name_override = target_name_override + self.steps_override = steps_override def execute(self, **kwargs): @@ -51,7 +53,8 @@ def execute(self, **kwargs): self.job_name, git_branch=self.git_branch, schema_override=self.schema_override, - target_name_override=self.target_name_override + target_name_override=self.target_name_override, + steps_override=self.steps_override ) self.log.info('Triggered Run ID {}'.format(trigger_resp['id'])) except RuntimeError as e: From ad80010085cac85f0b677c8e4151bc0e9a72086f Mon Sep 17 00:00:00 2001 From: Andrew Pope Date: Wed, 3 Feb 2021 08:08:08 -0700 Subject: [PATCH 05/36] adds new run and watch operator to facilitate retries --- .../dbt_cloud_run_and_watch_job_operator.py | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py diff --git a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py new file mode 100644 index 0000000..3f5582b --- /dev/null +++ b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +import json +import requests +import time + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.exceptions import AirflowException, AirflowSkipException +from ..hooks.dbt_cloud_hook import DbtCloudHook +from ..operators.dbt_cloud_run_job_operator import DbtCloudRunJobOperator + + +class DbtCloudRunAndWatchJobOperator(DbtCloudRunJobOperator): + """ + Operator to run a dbt cloud job. + :param dbt_cloud_conn_id: dbt Cloud connection ID. + :type dbt_cloud_conn_id: string + :param project_id: dbt Cloud project ID. + :type project_id: int + :param job_name: dbt Cloud job name. + :type job_name: string + """ + + @apply_defaults + def __init__(self, + poke_interval=60, + timeout=60 * 60 * 24, + soft_fail=False, + *args, **kwargs): + self.poke_interval = poke_interval + self.timeout = timeout + self.soft_fail = soft_fail + super(DbtCloudRunAndWatchJobOperator, self).__init__(*args, **kwargs) + + def execute(self, **kwargs): + run_id = super(DbtCloudRunAndWatchJobOperator, self).execute(**kwargs) + + # basically copy-pasting the Sensor code + self.log.info(f'Starting poke for job {run_id}') + try_number = 1 + started_at = time.monotonic() + + def run_duration(): + nonlocal started_at + return time.monotonic() - started_at + + while not self.poke(run_id): + if run_duration() > self.timeout: + if self.soft_fail: + raise AirflowSkipException(f'Time is out!') + else: + raise AirflowException(f'Time is out!') + else: + time.sleep(self.poke_interval) + try_number += 1 + self.log.info('Success criteria met. Exiting.') + + def poke(self, run_id): + self.log.info('Sensor checking state of dbt cloud run ID: %s', run_id) + dbt_cloud_hook = DbtCloudHook(dbt_cloud_conn_id=self.dbt_cloud_conn_id) + run_status = dbt_cloud_hook.get_run_status(run_id=run_id) + self.log.info('State of Run ID {}: {}'.format(run_id, run_status)) + + TERMINAL_RUN_STATES = ['Success', 'Error', 'Cancelled'] + FAILED_RUN_STATES = ['Error', 'Cancelled'] + + if run_status.strip() in FAILED_RUN_STATES: + raise AirflowException('dbt cloud Run ID {} Failed.'.format(self.run_id)) + if run_status.strip() in TERMINAL_RUN_STATES: + return True + else: + return False From e8e7f9158dfaa851ab9f6de7bd464f58d5d89df0 Mon Sep 17 00:00:00 2001 From: Andrew Pope Date: Wed, 3 Feb 2021 09:51:01 -0700 Subject: [PATCH 06/36] small fix to error --- .../operators/dbt_cloud_run_and_watch_job_operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py index 3f5582b..68d1c1e 100644 --- a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py +++ b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py @@ -65,7 +65,7 @@ def poke(self, run_id): FAILED_RUN_STATES = ['Error', 'Cancelled'] if run_status.strip() in FAILED_RUN_STATES: - raise AirflowException('dbt cloud Run ID {} Failed.'.format(self.run_id)) + raise AirflowException('dbt cloud Run ID {} Failed.'.format(run_id)) if run_status.strip() in TERMINAL_RUN_STATES: return True else: From f54acdb408c1ac6e6b44822e595ca5624e7db92a Mon Sep 17 00:00:00 2001 From: Andrew Pope Date: Tue, 7 Dec 2021 14:53:31 -0700 Subject: [PATCH 07/36] Environment id filter (#1) * adds ability to filter jobs by environment_id * correctly imports AirflowException * adds print statements for debugging * fixes environment_id types * reverts print statements --- dbt_cloud_plugin/dbt_cloud/dbt_cloud.py | 15 +++++++++++---- dbt_cloud_plugin/hooks/dbt_cloud_hook.py | 9 +++++++-- .../operators/dbt_cloud_run_job_operator.py | 5 ++++- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py b/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py index aee6b34..c46d981 100644 --- a/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py +++ b/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py @@ -3,6 +3,9 @@ import requests import time +from airflow.exceptions import AirflowException + + class DbtCloud(object): """ Class for interacting with the dbt Cloud API @@ -36,8 +39,12 @@ def _post(self, url_suffix, data=None): else: raise RuntimeError(response.content) - def list_jobs(self): - return self._get('/accounts/%s/jobs/' % self.account_id).get('data') + def list_jobs(self, environment_id=None): + jobs = self._get('/accounts/%s/jobs/' % self.account_id).get('data') + if environment_id is not None: + return [j for j in jobs if str(j['environment_id']) == str(environment_id)] + else: + return jobs def get_run(self, run_id): return self._get('/accounts/%s/runs/%s/' % (self.account_id, run_id)).get('data') @@ -56,8 +63,8 @@ def try_get_run(self, run_id, max_tries=3): raise RuntimeError("Too many failures ({}) while querying for run status".format(run_id)) - def run_job(self, job_name, data=None): - jobs = self.list_jobs() + def run_job(self, job_name, data=None, environment_id=None): + jobs = self.list_jobs(environment_id=environment_id) job_matches = [j for j in jobs if j['name'] == job_name] diff --git a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py index 7a912fc..975f070 100644 --- a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py +++ b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py @@ -53,6 +53,8 @@ def _get_conn_extra(self): config['schema_override'] = conn['schema_override'] if 'target_name_override' in conn: config['target_name_override'] = conn['target_name_override'] + if 'environment_id' in conn: + config['environment_id'] = conn['environment_id'] return config @@ -67,7 +69,7 @@ def get_run_status(self, run_id): return status_name def run_job(self, job_name, git_branch=None, schema_override=None, - target_name_override=None, steps_override=None): + target_name_override=None, steps_override=None, environment_id=None): dbt_cloud = self.get_conn() extra = self._get_conn_extra() @@ -82,6 +84,9 @@ def run_job(self, job_name, git_branch=None, schema_override=None, if steps_override: data['steps_override'] = steps_override + # get environment + environment_id = environment_id or extra.get('environment_id', None) + self.log.info(f'Triggering job {job_name} with data {data}') - return dbt_cloud.run_job(job_name, data=data) + return dbt_cloud.run_job(job_name, data=data, environment_id=environment_id) diff --git a/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py index ad2d6d3..2fd3d9c 100644 --- a/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py +++ b/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py @@ -27,6 +27,7 @@ def __init__(self, schema_override=None, target_name_override=None, steps_override=None, + environment_id=None, *args, **kwargs): super(DbtCloudRunJobOperator, self).__init__(*args, **kwargs) @@ -42,6 +43,7 @@ def __init__(self, self.schema_override = schema_override self.target_name_override = target_name_override self.steps_override = steps_override + self.environment_id = environment_id def execute(self, **kwargs): @@ -54,7 +56,8 @@ def execute(self, **kwargs): git_branch=self.git_branch, schema_override=self.schema_override, target_name_override=self.target_name_override, - steps_override=self.steps_override + steps_override=self.steps_override, + environment_id=self.environment_id ) self.log.info('Triggered Run ID {}'.format(trigger_resp['id'])) except RuntimeError as e: From 4d0a89cd43c47d3602584a1a56a61ad0d194de80 Mon Sep 17 00:00:00 2001 From: Andrew Pope Date: Tue, 26 Sep 2023 14:34:43 -0600 Subject: [PATCH 08/36] add a new operator to check specific model results (#2) --- dbt_cloud_plugin/__init__.py | 1 + dbt_cloud_plugin/dbt_cloud/dbt_cloud.py | 14 ++ dbt_cloud_plugin/helpers.py | 56 ++++++++ dbt_cloud_plugin/hooks/dbt_cloud_hook.py | 33 +++++ .../dbt_cloud_check_model_result_operator.py | 111 +++++++++++++++ .../operators/dbt_cloud_run_job_operator.py | 4 +- setup.py | 4 +- tests/__init__.py | 0 ...t_dbt_cloud_check_model_result_operator.py | 134 ++++++++++++++++++ tests/test_generate_dbt_model_dependency.py | 102 +++++++++++++ 10 files changed, 456 insertions(+), 3 deletions(-) create mode 100644 dbt_cloud_plugin/helpers.py create mode 100644 dbt_cloud_plugin/operators/dbt_cloud_check_model_result_operator.py create mode 100644 tests/__init__.py create mode 100644 tests/test_dbt_cloud_check_model_result_operator.py create mode 100644 tests/test_generate_dbt_model_dependency.py diff --git a/dbt_cloud_plugin/__init__.py b/dbt_cloud_plugin/__init__.py index e69de29..df320c5 100644 --- a/dbt_cloud_plugin/__init__.py +++ b/dbt_cloud_plugin/__init__.py @@ -0,0 +1 @@ +from .helpers import generate_dbt_model_dependency diff --git a/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py b/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py index c46d981..547ff62 100644 --- a/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py +++ b/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py @@ -74,3 +74,17 @@ def run_job(self, job_name, data=None, environment_id=None): job_def = job_matches[0] trigger_resp = self.trigger_job_run(job_id=job_def['id'], data=data) return trigger_resp + + def get_artifact(self, run_id, artifact_filename, step=None): + if step is not None: + query_string = f'?step={step}' + else: + query_string = '' + + return self._get( + f'/accounts/{self.account_id}/runs/{run_id}/artifacts/{artifact_filename}{query_string}' + ) + + def get_job(self, job_id): + return self._get(f'/accounts/{self.account_id}/jobs/{job_id}').get('data') + diff --git a/dbt_cloud_plugin/helpers.py b/dbt_cloud_plugin/helpers.py new file mode 100644 index 0000000..043b70d --- /dev/null +++ b/dbt_cloud_plugin/helpers.py @@ -0,0 +1,56 @@ +from airflow.utils.task_group import TaskGroup +from airflow.models import BaseOperator + +from .operators.dbt_cloud_check_model_result_operator import DbtCloudCheckModelResultOperator +from .operators.dbt_cloud_run_job_operator import DbtCloudRunJobOperator + + +def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_models, ensure_models_ran=True): + """ + Create a dependency from one or more tasks on a set of models succeeding + in a dbt task. This function generates a new DbtCloudCheckModelResultOperator + task between dbt_job_task and downstream_tasks, checking that dependent_models + all ran successfully. + + :param dbt_job_task: The dbt Cloud operator which kicked off the run you want to check. + Both the credentials and the run_id will be pulled from this task. + :type dbt_job_task: DbtCloudRunJobOperator or DbtCloudRunAndWatchJobOperator + :param downstream_tasks: The downstream task(s) which depend on the model(s) succeeding. + Can be either a single task, a single TaskGroup, or a list of tasks. + :type downstream_tasks: BaseOperator or TaskGroup or list[BaseOperator] or list[TaskGroup] + :param dependent_models: The name(s) of the model(s) to check. See + DbtCloudCheckModelResultOperator for more details. + :type dependent_models: str or list[str] + :param ensure_models_ran: Whether to require that the dependent_models actually ran in + the run. If False, it will silently ignore models that didn't run. + :type ensure_models_ran: bool, default True + """ + + if not isinstance(dbt_job_task, DbtCloudRunJobOperator): + raise TypeError('dbt_job_task must be of type DbtCloudRunJobOperator or DbtCloudRunAndWatchOperator') + + if isinstance(downstream_tasks, list): + if len(downstream_tasks) == 0: + raise ValueError('You must pass at least one task in downstream_tasks') + if not (isinstance(downstream_tasks[0], BaseOperator) or isinstance(downstream_tasks[0], TaskGroup)): + raise TypeError('The elements of the downstream_tasks list must be of type BaseOperator or TaskGRoup') + elif not (isinstance(downstream_tasks, TaskGroup) or isinstance(downstream_tasks, BaseOperator)): + raise TypeError('downstream_tasks must be of one of the following types: BaseOperator, TaskGroup, or a list of one of those two') + + if isinstance(dependent_models, str): + dependent_models = [dependent_models] + model_ids = '__'.join(dependent_models) + task_id = f'check_dbt_model_results__{dbt_job_task.task_id}__{model_ids}' + task_id = task_id[:255] + + check_dbt_model_results = DbtCloudCheckModelResultOperator( + task_id=task_id, + dbt_cloud_conn_id=dbt_job_task.dbt_cloud_conn_id, + dbt_cloud_run_id=f'{{{{ ti.xcom_pull(task_ids="{dbt_job_task.task_id}", key="dbt_cloud_run_id") }}}}', + model_names=dependent_models, + ensure_models_ran=ensure_models_ran, + trigger_rule='all_done', + retries=0 + ) + + return dbt_job_task >> check_dbt_model_results >> downstream_tasks diff --git a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py index 975f070..24f3fb3 100644 --- a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py +++ b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py @@ -68,6 +68,39 @@ def get_run_status(self, run_id): status_name = RunStatus.lookup(run['status']) return status_name + def get_run_manifest(self, run_id): + """ + Return the manifest.json from a dbt Cloud run. + """ + dbt_cloud = self.get_conn() + return dbt_cloud.get_artifact(run_id, 'manifest.json') + + def get_all_run_results(self, run_id): + """ + Return the results array from run_results.json from a dbt Cloud run, + concatenated across all (real) steps. + """ + dbt_cloud = self.get_conn() + + # first, determine the number of steps in this job + # it will either be defined in the run or in the job definition + run = dbt_cloud.get_run(run_id) + total_steps = len(run['run_steps']) + if total_steps == 0: # not defined on the run, check the job + job_id = run['job_id'] + job = dbt_cloud.get_job(job_id) + total_steps = len(job['execute_steps']) + + # the first 3 steps of a dbt Cloud job are always the same and + # never have any run results + starting_step = 4 + all_run_results = [] + for step in range(starting_step, starting_step + total_steps): + run_results = dbt_cloud.get_artifact(run_id, 'run_results.json', step=step) + all_run_results.extend(run_results['results']) + + return all_run_results + def run_job(self, job_name, git_branch=None, schema_override=None, target_name_override=None, steps_override=None, environment_id=None): dbt_cloud = self.get_conn() diff --git a/dbt_cloud_plugin/operators/dbt_cloud_check_model_result_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_check_model_result_operator.py new file mode 100644 index 0000000..c2f65a9 --- /dev/null +++ b/dbt_cloud_plugin/operators/dbt_cloud_check_model_result_operator.py @@ -0,0 +1,111 @@ +# -*- coding: utf-8 -*- +import json +import requests +import time + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.exceptions import AirflowException, AirflowSkipException +from ..hooks.dbt_cloud_hook import DbtCloudHook + + +SUCCESSFUL_STATUSES = ['success', 'pass'] + + +class DbtModelException(Exception): + pass + + +class DbtModelFailedException(DbtModelException): + pass + + +class DbtModelNotRunException(DbtModelException): + pass + + +class DbtCloudCheckModelResultOperator(BaseOperator): + """ + Check the results of a dbt Cloud job to see whether the model(s) you + care about ran successfully. Useful if you have a large dbt Cloud job, + but each of your downstream tasks only requires a small subset of models + to succeed. + + :param dbt_cloud_run_id: Run ID of a finished dbt Cloud job. Note that + this task must not start running until the dbt Cloud job has completed, + otherwise it will error out. See DbtCloudRunAndWatchJobOperator or + DbtCloudRunSensor. + :type dbt_cloud_run_id: str or int + :param model_names: A single model name or list of model names to check. + Tests and snapshot names also work. Note this must be the /name/, not the + node ID. In addition to the model name(s) supplied, all of the tests for + that model will also be checked (though unlike the model itself, they need + not have been run). + :type model_names: str or list[str] + :param dbt_cloud_conn_id: dbt Cloud connection ID + :type dbt_cloud_conn_id: str + :param ensure_models_ran: Whether to ensure all of the model_names were actually + executed in this run. Defaults to True to avoid accidentally mistyping the + model name, negating the value of this check. + :type ensure_models_ran: bool, default True + """ + + template_fields = ['dbt_cloud_run_id'] + + @apply_defaults + def __init__(self, dbt_cloud_run_id=None, model_names=None, ensure_models_ran=True, dbt_cloud_conn_id='dbt_default', *args, **kwargs): + super(DbtCloudCheckModelResultOperator, self).__init__(*args, **kwargs) + + if dbt_cloud_run_id is None: + raise AirflowException('No dbt Cloud run_id was supplied.') + if model_names is None: + raise AirflowException('No model names supplied.') + + self.dbt_cloud_conn_id = dbt_cloud_conn_id + self.dbt_cloud_run_id = dbt_cloud_run_id + if isinstance(model_names, str): + model_names = [model_names] + self.model_names = model_names + self.ensure_models_ran = ensure_models_ran + + def _find_test_dependencies_for_model_id(self, model_id, manifest): + tests = [] + for node, values in manifest['nodes'].items(): + if values['resource_type'] != 'test': + continue + for dependency in values['depends_on']['nodes']: + if dependency == model_id: + tests.append(node) + return tests + + def _find_model_id_from_name(self, model_name, manifest): + models = manifest['nodes'].values() + for model in models: + if model['name'] == model_name: + return model['unique_id'] + + def _check_that_model_passed(self, model_name, manifest, run_results): + model_id = self._find_model_id_from_name(model_name, manifest) + tests = self._find_test_dependencies_for_model_id(model_id, manifest) + all_dependencies = [model_id] + tests + self.log.info(f'Checking all dependencies for {model_name}: {all_dependencies}') + + ran_model = False + for result in run_results: + if result['unique_id'] == model_id: + ran_model = True + if result['unique_id'] in all_dependencies: + if result['status'] not in SUCCESSFUL_STATUSES: + raise DbtModelFailedException(f'Dependency {result["unique_id"]} did not pass, status: {result["status"]}!') + + if not ran_model and self.ensure_models_ran: + raise DbtModelNotRunException(f'Model {model_id} was not run!') + + def execute(self, **kwargs): + dbt_cloud_hook = DbtCloudHook(dbt_cloud_conn_id=self.dbt_cloud_conn_id) + manifest = dbt_cloud_hook.get_run_manifest(self.dbt_cloud_run_id) + run_results = dbt_cloud_hook.get_all_run_results(self.dbt_cloud_run_id) + + for model in self.model_names: + self._check_that_model_passed(model, manifest, run_results) + diff --git a/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py index 2fd3d9c..3006230 100644 --- a/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py +++ b/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py @@ -63,4 +63,6 @@ def execute(self, **kwargs): except RuntimeError as e: raise AirflowException("Error while triggering job {}: {}".format(self.job_name, e)) - return trigger_resp['id'] + run_id = trigger_resp['id'] + self.xcom_push(kwargs['context'], 'dbt_cloud_run_id', run_id) + return run_id diff --git a/setup.py b/setup.py index cf2d7e2..c1901e2 100644 --- a/setup.py +++ b/setup.py @@ -3,13 +3,13 @@ setup( name='dbt_cloud_plugin', - version='0.1', + version='0.2', packages=[ 'dbt_cloud_plugin', 'dbt_cloud_plugin.dbt_cloud', 'dbt_cloud_plugin.hooks', 'dbt_cloud_plugin.operators', - 'dbt_cloud_plugin.sensors' + 'dbt_cloud_plugin.sensors', ], install_requires=[ 'apache-airflow' diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_dbt_cloud_check_model_result_operator.py b/tests/test_dbt_cloud_check_model_result_operator.py new file mode 100644 index 0000000..7697289 --- /dev/null +++ b/tests/test_dbt_cloud_check_model_result_operator.py @@ -0,0 +1,134 @@ +import unittest +from unittest.mock import MagicMock, patch +from airflow.exceptions import AirflowException +from dbt_cloud_plugin.operators.dbt_cloud_check_model_result_operator import DbtCloudCheckModelResultOperator, DbtModelFailedException, DbtModelNotRunException + +class TestDbtCloudCheckModelResultOperator(unittest.TestCase): + + def setUp(self): + self.operator = DbtCloudCheckModelResultOperator( + task_id='test_task', + dbt_cloud_conn_id='dbt_cloud_conn', + dbt_cloud_run_id='run_id', + model_names=['model1', 'model2'], + ) + + def test_init_with_invalid_run_id(self): + with self.assertRaises(AirflowException): + DbtCloudCheckModelResultOperator( + task_id='test_task', + dbt_cloud_conn_id='dbt_cloud_conn', + dbt_cloud_run_id=None, + model_names=['model1'], + ) + + def test_init_with_invalid_model_names(self): + with self.assertRaises(AirflowException): + DbtCloudCheckModelResultOperator( + task_id='test_task', + dbt_cloud_conn_id='dbt_cloud_conn', + dbt_cloud_run_id='run_id', + model_names=None, + ) + + def test_find_test_dependencies_for_model_id(self): + manifest = { + 'nodes': { + 'model_id1': {'resource_type': 'model', 'depends_on': {'nodes': []}}, + 'model_id2': {'resource_type': 'model', 'depends_on': {'nodes': []}}, + 'test_id1': {'resource_type': 'test', 'depends_on': {'nodes': ['model_id1']}}, + 'test_id2': {'resource_type': 'test', 'depends_on': {'nodes': ['model_id2']}}, + } + } + result = self.operator._find_test_dependencies_for_model_id('model_id1', manifest) + self.assertEqual(result, ['test_id1']) + + def test_find_model_id_from_name(self): + manifest = { + 'nodes': { + 'model_id1': {'name': 'model1', 'unique_id': 'model_id1'}, + 'model_id2': {'name': 'model2', 'unique_id': 'model_id2'}, + } + } + result = self.operator._find_model_id_from_name('model2', manifest) + self.assertEqual(result, 'model_id2') + + def test_check_that_model_passed(self): + manifest = { + 'nodes': { + 'model_id1': {'name': 'model1', 'unique_id': 'model_id1', 'resource_type': 'model'}, + } + } + run_results = [{'unique_id': 'model_id1', 'status': 'success'}] + self.operator._check_that_model_passed('model1', manifest, run_results) + + run_results = [{'unique_id': 'model_id1', 'status': 'pass'}] + self.operator._check_that_model_passed('model1', manifest, run_results) + + def test_check_that_model_passed_test_dependency(self): + manifest = { + 'nodes': { + 'model_id1': {'name': 'model1', 'unique_id': 'model_id1', 'resource_type': 'model'}, + 'test_id1': {'name': 'test1', 'unique_id': 'test_id1', 'resource_type': 'test', + 'depends_on': {'nodes': ['model_id1']}} + } + } + run_results = [{'unique_id': 'model_id1', 'status': 'success'}] + self.operator._check_that_model_passed('model1', manifest, run_results) + + run_results = [{'unique_id': 'model_id1', 'status': 'pass'}] + self.operator._check_that_model_passed('model1', manifest, run_results) + + run_results = [{'unique_id': 'model_id1', 'status': 'pass'}, + {'unique_id': 'test_id1', 'status': 'failed'}] + with self.assertRaises(DbtModelFailedException): + self.operator._check_that_model_passed('model1', manifest, run_results) + + def test_check_that_model_failed(self): + manifest = { + 'nodes': { + 'model_id1': {'name': 'model1', 'unique_id': 'model_id1', 'resource_type': 'model'}, + } + } + run_results = [{'unique_id': 'model_id1', 'status': 'failed'}] + with self.assertRaises(DbtModelFailedException): + self.operator._check_that_model_passed('model1', manifest, run_results) + + def test_check_that_model_did_not_run_when_ensuring_models_ran(self): + manifest = { + 'nodes': { + 'model_id1': {'name': 'model1', 'unique_id': 'model_id1', 'resource_type': 'model'}, + } + } + run_results = [] + with self.assertRaises(DbtModelNotRunException): + self.operator._check_that_model_passed('model1', manifest, run_results) + + def test_ignore_that_model_did_not_run_when_not_ensuring_models_ran(self): + manifest = { + 'nodes': { + 'model_id1': {'name': 'model1', 'unique_id': 'model_id1', 'resource_type': 'model'}, + } + } + run_results = [] + self.operator.ensure_models_ran = False + self.operator._check_that_model_passed('model1', manifest, run_results) + + @patch('dbt_cloud_plugin.operators.dbt_cloud_check_model_result_operator.DbtCloudHook') + def test_execute(self, mock_hook_class): + mock_hook = MagicMock() + mock_hook_class.return_value = mock_hook + mock_hook.get_run_manifest.return_value = {} + mock_hook.get_all_run_results.return_value = [] + self.operator._check_that_model_passed = MagicMock() + self.operator._check_that_model_passed.return_value = None + + self.operator.execute() + + mock_hook.get_run_manifest.assert_called_once_with('run_id') + mock_hook.get_all_run_results.assert_called_once_with('run_id') + self.operator._check_that_model_passed.assert_any_call('model1', {}, []) + self.operator._check_that_model_passed.assert_any_call('model2', {}, []) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_generate_dbt_model_dependency.py b/tests/test_generate_dbt_model_dependency.py new file mode 100644 index 0000000..8f0d473 --- /dev/null +++ b/tests/test_generate_dbt_model_dependency.py @@ -0,0 +1,102 @@ +import unittest +from unittest.mock import Mock +from datetime import datetime + +from airflow import DAG +from airflow.utils.task_group import TaskGroup +from airflow.operators.dummy import DummyOperator + +from dbt_cloud_plugin.helpers import generate_dbt_model_dependency +from dbt_cloud_plugin.operators.dbt_cloud_run_and_watch_job_operator import DbtCloudRunAndWatchJobOperator +from dbt_cloud_plugin.operators.dbt_cloud_check_model_result_operator import DbtCloudCheckModelResultOperator + + +class TestGenerateDbtModelDependency(unittest.TestCase): + + def test_generate_dbt_model_dependency_with_single_task(self): + # Create a minimal DAG for testing + dag = DAG(dag_id='test_dag', start_date=datetime(2023, 1, 1), catchup=False) + with dag: + dbt_job_task = DbtCloudRunAndWatchJobOperator(task_id='dbt_job_task', dbt_cloud_conn_id='dbt_default', job_name='test job') + + task1 = DummyOperator(task_id='task1') + + generate_dbt_model_dependency(dbt_job_task, task1, ['model1', 'model2'], ensure_models_ran=True) + + self.assertEqual(len(dag.tasks), 3) + # find the generated task + for task in dag.tasks: + if isinstance(task, DbtCloudCheckModelResultOperator): + result = task + + # Verify the generated dependencies + self.assertEqual(result.task_id, 'check_dbt_model_results__dbt_job_task__model1__model2') + self.assertIsInstance(result, DbtCloudCheckModelResultOperator) + self.assertEqual(result.dbt_cloud_conn_id, dbt_job_task.dbt_cloud_conn_id) + self.assertEqual(result.ensure_models_ran, True) + self.assertEqual(result.trigger_rule, 'all_done') + self.assertEqual(result.retries, 0) + self.assertIn(dbt_job_task, result.get_flat_relatives(upstream=True)) + self.assertIn(task1, result.get_flat_relatives(upstream=False)) + + def test_generate_dbt_model_dependency_with_list(self): + # Create a minimal DAG for testing + dag = DAG(dag_id='test_dag', start_date=datetime(2023, 1, 1), catchup=False) + with dag: + dbt_job_task = DbtCloudRunAndWatchJobOperator(task_id='dbt_job_task', dbt_cloud_conn_id='dbt_default', job_name='test job') + + task1 = DummyOperator(task_id='task1') + task2 = DummyOperator(task_id='task2') + downstream_tasks = [task1, task2] + + generate_dbt_model_dependency(dbt_job_task, downstream_tasks, ['model1', 'model2'], ensure_models_ran=True) + + self.assertEqual(len(dag.tasks), 4) + # find the generated task + for task in dag.tasks: + if isinstance(task, DbtCloudCheckModelResultOperator): + result = task + + # Verify the generated dependencies + self.assertEqual(result.task_id, 'check_dbt_model_results__dbt_job_task__model1__model2') + self.assertIsInstance(result, DbtCloudCheckModelResultOperator) + self.assertEqual(result.dbt_cloud_conn_id, dbt_job_task.dbt_cloud_conn_id) + self.assertEqual(result.ensure_models_ran, True) + self.assertEqual(result.trigger_rule, 'all_done') + self.assertEqual(result.retries, 0) + self.assertIn(dbt_job_task, result.get_flat_relatives(upstream=True)) + self.assertIn(task1, result.get_flat_relatives(upstream=False)) + self.assertIn(task2, result.get_flat_relatives(upstream=False)) + + + def test_generate_dbt_model_dependency_with_task_group(self): + # Create a minimal DAG for testing + dag = DAG(dag_id='test_dag', start_date=datetime(2023, 1, 1), catchup=False) + with dag: + dbt_job_task = DbtCloudRunAndWatchJobOperator(task_id='dbt_job_task', dbt_cloud_conn_id='dbt_default', job_name='test job') + + with TaskGroup(group_id='task_group_name') as downstream_tasks: + task1 = DummyOperator(task_id='task1') + task2 = DummyOperator(task_id='task2') + + generate_dbt_model_dependency(dbt_job_task, downstream_tasks, ['model1', 'model2'], ensure_models_ran=False) + + self.assertEqual(len(dag.tasks), 4) + # find the generated task + for task in dag.tasks: + if isinstance(task, DbtCloudCheckModelResultOperator): + result = task + + # Verify the generated dependencies + self.assertEqual(result.task_id, 'check_dbt_model_results__dbt_job_task__model1__model2') + self.assertEqual(result.dbt_cloud_conn_id, dbt_job_task.dbt_cloud_conn_id) + self.assertEqual(result.ensure_models_ran, False) + self.assertEqual(result.trigger_rule, 'all_done') + self.assertEqual(result.retries, 0) + self.assertIn(dbt_job_task, result.get_flat_relatives(upstream=True)) + self.assertIn(task1, result.get_flat_relatives(upstream=False)) + self.assertIn(task2, result.get_flat_relatives(upstream=False)) + + +if __name__ == '__main__': + unittest.main() From 2d493a02740ed4e4f6aca92afde59e488291ad83 Mon Sep 17 00:00:00 2001 From: Andrew Pope Date: Wed, 27 Sep 2023 14:23:07 -0600 Subject: [PATCH 09/36] more meaningful errors for DbtCloudCheckModelResultOperator --- dbt_cloud_plugin/dbt_cloud/dbt_cloud.py | 2 +- .../dbt_cloud_check_model_result_operator.py | 2 ++ ...t_dbt_cloud_check_model_result_operator.py | 27 +++++++++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py b/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py index 547ff62..28ec750 100644 --- a/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py +++ b/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py @@ -28,7 +28,7 @@ def _get(self, url_suffix): if response.status_code == 200: return json.loads(response.content) else: - raise RuntimeError(response.content) + raise RuntimeError(f'Error getting URL {url}: {response.content}') def _post(self, url_suffix, data=None): url = self.api_base + url_suffix diff --git a/dbt_cloud_plugin/operators/dbt_cloud_check_model_result_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_check_model_result_operator.py index c2f65a9..795c844 100644 --- a/dbt_cloud_plugin/operators/dbt_cloud_check_model_result_operator.py +++ b/dbt_cloud_plugin/operators/dbt_cloud_check_model_result_operator.py @@ -102,6 +102,8 @@ def _check_that_model_passed(self, model_name, manifest, run_results): raise DbtModelNotRunException(f'Model {model_id} was not run!') def execute(self, **kwargs): + if self.dbt_cloud_run_id is None or self.dbt_cloud_run_id == '': + raise ValueError('dbt_cloud_run_id is empty!') dbt_cloud_hook = DbtCloudHook(dbt_cloud_conn_id=self.dbt_cloud_conn_id) manifest = dbt_cloud_hook.get_run_manifest(self.dbt_cloud_run_id) run_results = dbt_cloud_hook.get_all_run_results(self.dbt_cloud_run_id) diff --git a/tests/test_dbt_cloud_check_model_result_operator.py b/tests/test_dbt_cloud_check_model_result_operator.py index 7697289..63db4cd 100644 --- a/tests/test_dbt_cloud_check_model_result_operator.py +++ b/tests/test_dbt_cloud_check_model_result_operator.py @@ -130,5 +130,32 @@ def test_execute(self, mock_hook_class): self.operator._check_that_model_passed.assert_any_call('model1', {}, []) self.operator._check_that_model_passed.assert_any_call('model2', {}, []) + @patch('dbt_cloud_plugin.operators.dbt_cloud_check_model_result_operator.DbtCloudHook') + def test_execute_fails_with_missing_run_id(self, mock_hook_class): + mock_hook = MagicMock() + mock_hook_class.return_value = mock_hook + mock_hook.get_run_manifest.return_value = {} + mock_hook.get_all_run_results.return_value = [] + self.operator._check_that_model_passed = MagicMock() + self.operator._check_that_model_passed.return_value = None + + self.operator.dbt_cloud_run_id = '' + with self.assertRaises(ValueError): + self.operator.execute() + + @patch('dbt_cloud_plugin.operators.dbt_cloud_check_model_result_operator.DbtCloudHook') + def test_execute_fails_with_missing_run_id(self, mock_hook_class): + mock_hook = MagicMock() + mock_hook_class.return_value = mock_hook + mock_hook.get_run_manifest.return_value = {} + mock_hook.get_all_run_results.return_value = [] + self.operator._check_that_model_passed = MagicMock() + self.operator._check_that_model_passed.return_value = None + + self.operator.dbt_cloud_run_id = None + with self.assertRaises(ValueError): + self.operator.execute() + + if __name__ == '__main__': unittest.main() From 4a45096da4653bd66f61001f269c27c2e71902d0 Mon Sep 17 00:00:00 2001 From: Andrew Pope Date: Wed, 27 Sep 2023 15:19:53 -0600 Subject: [PATCH 10/36] fix for formatting of the error message --- dbt_cloud_plugin/dbt_cloud/dbt_cloud.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py b/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py index 28ec750..906ecc7 100644 --- a/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py +++ b/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py @@ -28,7 +28,7 @@ def _get(self, url_suffix): if response.status_code == 200: return json.loads(response.content) else: - raise RuntimeError(f'Error getting URL {url}: {response.content}') + raise RuntimeError(f'Error getting URL {url}:\n{str(response.content)}') def _post(self, url_suffix, data=None): url = self.api_base + url_suffix From dd76a027dfeff2c843e1cf84913052c7c9412d67 Mon Sep 17 00:00:00 2001 From: Andrew Pope Date: Wed, 27 Sep 2023 15:27:14 -0600 Subject: [PATCH 11/36] fix for string version of None --- .../operators/dbt_cloud_check_model_result_operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt_cloud_plugin/operators/dbt_cloud_check_model_result_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_check_model_result_operator.py index 795c844..972a9f7 100644 --- a/dbt_cloud_plugin/operators/dbt_cloud_check_model_result_operator.py +++ b/dbt_cloud_plugin/operators/dbt_cloud_check_model_result_operator.py @@ -102,7 +102,7 @@ def _check_that_model_passed(self, model_name, manifest, run_results): raise DbtModelNotRunException(f'Model {model_id} was not run!') def execute(self, **kwargs): - if self.dbt_cloud_run_id is None or self.dbt_cloud_run_id == '': + if self.dbt_cloud_run_id is None or self.dbt_cloud_run_id in ('', 'None'): raise ValueError('dbt_cloud_run_id is empty!') dbt_cloud_hook = DbtCloudHook(dbt_cloud_conn_id=self.dbt_cloud_conn_id) manifest = dbt_cloud_hook.get_run_manifest(self.dbt_cloud_run_id) From 0cd66860bfe2c171f84df3c2003f4635dfa1edf2 Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Tue, 24 Oct 2023 09:55:12 -0700 Subject: [PATCH 12/36] added option to set number of retries for generate model deps --- dbt_cloud_plugin/helpers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt_cloud_plugin/helpers.py b/dbt_cloud_plugin/helpers.py index 043b70d..b9b73ee 100644 --- a/dbt_cloud_plugin/helpers.py +++ b/dbt_cloud_plugin/helpers.py @@ -5,7 +5,7 @@ from .operators.dbt_cloud_run_job_operator import DbtCloudRunJobOperator -def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_models, ensure_models_ran=True): +def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_models, ensure_models_ran=True, retries=0): """ Create a dependency from one or more tasks on a set of models succeeding in a dbt task. This function generates a new DbtCloudCheckModelResultOperator @@ -50,7 +50,7 @@ def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_mode model_names=dependent_models, ensure_models_ran=ensure_models_ran, trigger_rule='all_done', - retries=0 + retries=retries ) return dbt_job_task >> check_dbt_model_results >> downstream_tasks From f0a327982f67351fcd2107cfd63a8fec9555b042 Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Tue, 24 Oct 2023 10:28:19 -0700 Subject: [PATCH 13/36] Revert "added option to set number of retries for generate model deps" This reverts commit 0cd66860bfe2c171f84df3c2003f4635dfa1edf2. --- dbt_cloud_plugin/helpers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt_cloud_plugin/helpers.py b/dbt_cloud_plugin/helpers.py index b9b73ee..043b70d 100644 --- a/dbt_cloud_plugin/helpers.py +++ b/dbt_cloud_plugin/helpers.py @@ -5,7 +5,7 @@ from .operators.dbt_cloud_run_job_operator import DbtCloudRunJobOperator -def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_models, ensure_models_ran=True, retries=0): +def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_models, ensure_models_ran=True): """ Create a dependency from one or more tasks on a set of models succeeding in a dbt task. This function generates a new DbtCloudCheckModelResultOperator @@ -50,7 +50,7 @@ def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_mode model_names=dependent_models, ensure_models_ran=ensure_models_ran, trigger_rule='all_done', - retries=retries + retries=0 ) return dbt_job_task >> check_dbt_model_results >> downstream_tasks From 2940fe7ca282da44b4dde73deb5c670561fb6a35 Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Tue, 24 Oct 2023 10:32:32 -0700 Subject: [PATCH 14/36] added retries to generate model dependency --- dbt_cloud_plugin/helpers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt_cloud_plugin/helpers.py b/dbt_cloud_plugin/helpers.py index 043b70d..b9b73ee 100644 --- a/dbt_cloud_plugin/helpers.py +++ b/dbt_cloud_plugin/helpers.py @@ -5,7 +5,7 @@ from .operators.dbt_cloud_run_job_operator import DbtCloudRunJobOperator -def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_models, ensure_models_ran=True): +def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_models, ensure_models_ran=True, retries=0): """ Create a dependency from one or more tasks on a set of models succeeding in a dbt task. This function generates a new DbtCloudCheckModelResultOperator @@ -50,7 +50,7 @@ def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_mode model_names=dependent_models, ensure_models_ran=ensure_models_ran, trigger_rule='all_done', - retries=0 + retries=retries ) return dbt_job_task >> check_dbt_model_results >> downstream_tasks From 47da0f96c7a947fe5aaa9fc1942ff242279ceb70 Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Wed, 6 Dec 2023 09:23:33 -0800 Subject: [PATCH 15/36] changed defualt upstream check --- dbt_cloud_plugin/helpers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt_cloud_plugin/helpers.py b/dbt_cloud_plugin/helpers.py index b9b73ee..47b5f9b 100644 --- a/dbt_cloud_plugin/helpers.py +++ b/dbt_cloud_plugin/helpers.py @@ -5,7 +5,7 @@ from .operators.dbt_cloud_run_job_operator import DbtCloudRunJobOperator -def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_models, ensure_models_ran=True, retries=0): +def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_models, ensure_models_ran=True, trigger_rule='none_skipped', retries=0): """ Create a dependency from one or more tasks on a set of models succeeding in a dbt task. This function generates a new DbtCloudCheckModelResultOperator @@ -49,7 +49,7 @@ def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_mode dbt_cloud_run_id=f'{{{{ ti.xcom_pull(task_ids="{dbt_job_task.task_id}", key="dbt_cloud_run_id") }}}}', model_names=dependent_models, ensure_models_ran=ensure_models_ran, - trigger_rule='all_done', + trigger_rule=trigger_rule, retries=retries ) From 2766372caa323bf6f66eba88e3fea4d9dd1d0fd3 Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Wed, 6 Dec 2023 11:54:14 -0800 Subject: [PATCH 16/36] added short circuit operator and placed in task group --- dbt_cloud_plugin/helpers.py | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/dbt_cloud_plugin/helpers.py b/dbt_cloud_plugin/helpers.py index b9b73ee..872969b 100644 --- a/dbt_cloud_plugin/helpers.py +++ b/dbt_cloud_plugin/helpers.py @@ -1,10 +1,14 @@ from airflow.utils.task_group import TaskGroup from airflow.models import BaseOperator +from airflow.operators.python_operator import ShortCircuitOperator from .operators.dbt_cloud_check_model_result_operator import DbtCloudCheckModelResultOperator from .operators.dbt_cloud_run_job_operator import DbtCloudRunJobOperator +def get_state(task_id, **context): + return context['dag_run'].get_task_instance(task_id).state + def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_models, ensure_models_ran=True, retries=0): """ Create a dependency from one or more tasks on a set of models succeeding @@ -43,14 +47,22 @@ def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_mode task_id = f'check_dbt_model_results__{dbt_job_task.task_id}__{model_ids}' task_id = task_id[:255] - check_dbt_model_results = DbtCloudCheckModelResultOperator( - task_id=task_id, - dbt_cloud_conn_id=dbt_job_task.dbt_cloud_conn_id, - dbt_cloud_run_id=f'{{{{ ti.xcom_pull(task_ids="{dbt_job_task.task_id}", key="dbt_cloud_run_id") }}}}', - model_names=dependent_models, - ensure_models_ran=ensure_models_ran, - trigger_rule='all_done', - retries=retries - ) + with TaskGroup(group_id=task_id) as check_dbt_model_results: + check_upstream_dbt_job_state = ShortCircuitOperator( + task_id='check_upstream_dbt_job_state', + python_callable=lambda: get_state(dbt_job_task) == 'success' or get_state(dbt_job_task) == 'failed', + provide_context=True, + ) + + check_dbt_model_successful = DbtCloudCheckModelResultOperator( + task_id='check_dbt_model_successful', + dbt_cloud_conn_id=dbt_job_task.dbt_cloud_conn_id, + dbt_cloud_run_id=f'{{{{ ti.xcom_pull(task_ids="{dbt_job_task.task_id}", key="dbt_cloud_run_id") }}}}', + model_names=dependent_models, + ensure_models_ran=ensure_models_ran, + retries=retries + ) + + check_upstream_dbt_job_state >> check_dbt_model_successful return dbt_job_task >> check_dbt_model_results >> downstream_tasks From f47995343fff4b5edb1b426563df399543375a03 Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Wed, 6 Dec 2023 12:20:05 -0800 Subject: [PATCH 17/36] fixed period issue --- dbt_cloud_plugin/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt_cloud_plugin/helpers.py b/dbt_cloud_plugin/helpers.py index 872969b..2c2c9a0 100644 --- a/dbt_cloud_plugin/helpers.py +++ b/dbt_cloud_plugin/helpers.py @@ -45,7 +45,7 @@ def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_mode dependent_models = [dependent_models] model_ids = '__'.join(dependent_models) task_id = f'check_dbt_model_results__{dbt_job_task.task_id}__{model_ids}' - task_id = task_id[:255] + task_id = task_id[:255].replace('.', '__') with TaskGroup(group_id=task_id) as check_dbt_model_results: check_upstream_dbt_job_state = ShortCircuitOperator( From fab815c37687a9a4fcf6cb0f0b43ac51053f18d5 Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Wed, 6 Dec 2023 16:19:04 -0800 Subject: [PATCH 18/36] fixed to local version match --- dbt_cloud_plugin/helpers.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/dbt_cloud_plugin/helpers.py b/dbt_cloud_plugin/helpers.py index 2c2c9a0..6c3577c 100644 --- a/dbt_cloud_plugin/helpers.py +++ b/dbt_cloud_plugin/helpers.py @@ -6,9 +6,6 @@ from .operators.dbt_cloud_run_job_operator import DbtCloudRunJobOperator -def get_state(task_id, **context): - return context['dag_run'].get_task_instance(task_id).state - def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_models, ensure_models_ran=True, retries=0): """ Create a dependency from one or more tasks on a set of models succeeding @@ -50,8 +47,8 @@ def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_mode with TaskGroup(group_id=task_id) as check_dbt_model_results: check_upstream_dbt_job_state = ShortCircuitOperator( task_id='check_upstream_dbt_job_state', - python_callable=lambda: get_state(dbt_job_task) == 'success' or get_state(dbt_job_task) == 'failed', - provide_context=True, + python_callable=lambda: dbt_job_task.state == 'success' or dbt_job_task.state == 'failed', + trigger_rule='all_done' ) check_dbt_model_successful = DbtCloudCheckModelResultOperator( @@ -65,4 +62,4 @@ def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_mode check_upstream_dbt_job_state >> check_dbt_model_successful - return dbt_job_task >> check_dbt_model_results >> downstream_tasks + return dbt_job_task >> check_dbt_model_results >> downstream_tasks \ No newline at end of file From 3dccb78ada1077ba5c1d8f1d1a37ed6b3365be9d Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Wed, 6 Dec 2023 16:43:47 -0800 Subject: [PATCH 19/36] fixed callback --- dbt_cloud_plugin/helpers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dbt_cloud_plugin/helpers.py b/dbt_cloud_plugin/helpers.py index 6c3577c..97f638f 100644 --- a/dbt_cloud_plugin/helpers.py +++ b/dbt_cloud_plugin/helpers.py @@ -48,7 +48,8 @@ def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_mode check_upstream_dbt_job_state = ShortCircuitOperator( task_id='check_upstream_dbt_job_state', python_callable=lambda: dbt_job_task.state == 'success' or dbt_job_task.state == 'failed', - trigger_rule='all_done' + trigger_rule='all_done', + on_failure_callback=None, ) check_dbt_model_successful = DbtCloudCheckModelResultOperator( From 30820f9c9872f2688bb7bd5dfc0ea05027383ca4 Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Wed, 6 Dec 2023 17:46:28 -0800 Subject: [PATCH 20/36] updated lambda function --- dbt_cloud_plugin/helpers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dbt_cloud_plugin/helpers.py b/dbt_cloud_plugin/helpers.py index 97f638f..a52d152 100644 --- a/dbt_cloud_plugin/helpers.py +++ b/dbt_cloud_plugin/helpers.py @@ -47,9 +47,10 @@ def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_mode with TaskGroup(group_id=task_id) as check_dbt_model_results: check_upstream_dbt_job_state = ShortCircuitOperator( task_id='check_upstream_dbt_job_state', - python_callable=lambda: dbt_job_task.state == 'success' or dbt_job_task.state == 'failed', + python_callable=lambda **context: context['dag_run'].get_task_instance(task_id).state in ['success', 'failed'], trigger_rule='all_done', on_failure_callback=None, + provide_context=True ) check_dbt_model_successful = DbtCloudCheckModelResultOperator( From b2c93e702eef003899feabf3071355ce901412c6 Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Wed, 6 Dec 2023 18:54:56 -0800 Subject: [PATCH 21/36] change --- dbt_cloud_plugin/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt_cloud_plugin/helpers.py b/dbt_cloud_plugin/helpers.py index a52d152..8e0cc05 100644 --- a/dbt_cloud_plugin/helpers.py +++ b/dbt_cloud_plugin/helpers.py @@ -47,7 +47,7 @@ def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_mode with TaskGroup(group_id=task_id) as check_dbt_model_results: check_upstream_dbt_job_state = ShortCircuitOperator( task_id='check_upstream_dbt_job_state', - python_callable=lambda **context: context['dag_run'].get_task_instance(task_id).state in ['success', 'failed'], + python_callable=lambda **context: context['dag_run'].get_task_instance(dbt_job_task).state in ['success', 'failed'], trigger_rule='all_done', on_failure_callback=None, provide_context=True From 915118502614e6100048cf74cd30fb332b6ba7ba Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Wed, 6 Dec 2023 19:16:07 -0800 Subject: [PATCH 22/36] changed task --- dbt_cloud_plugin/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt_cloud_plugin/helpers.py b/dbt_cloud_plugin/helpers.py index 8e0cc05..622f0a2 100644 --- a/dbt_cloud_plugin/helpers.py +++ b/dbt_cloud_plugin/helpers.py @@ -47,7 +47,7 @@ def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_mode with TaskGroup(group_id=task_id) as check_dbt_model_results: check_upstream_dbt_job_state = ShortCircuitOperator( task_id='check_upstream_dbt_job_state', - python_callable=lambda **context: context['dag_run'].get_task_instance(dbt_job_task).state in ['success', 'failed'], + python_callable=lambda **context: context['dag_run'].get_task_instance(dbt_job_task.task_id).state in ['success', 'failed'], trigger_rule='all_done', on_failure_callback=None, provide_context=True From 36bc97f3bec0a6cc1ca7f96fd249211beb61f120 Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Wed, 6 Dec 2023 19:49:28 -0800 Subject: [PATCH 23/36] default notification rules work --- dbt_cloud_plugin/helpers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dbt_cloud_plugin/helpers.py b/dbt_cloud_plugin/helpers.py index 622f0a2..073e2af 100644 --- a/dbt_cloud_plugin/helpers.py +++ b/dbt_cloud_plugin/helpers.py @@ -49,7 +49,6 @@ def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_mode task_id='check_upstream_dbt_job_state', python_callable=lambda **context: context['dag_run'].get_task_instance(dbt_job_task.task_id).state in ['success', 'failed'], trigger_rule='all_done', - on_failure_callback=None, provide_context=True ) From ecfc497dd0aaee50839692a9eeaf357d585ac79c Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Mon, 12 Feb 2024 17:46:49 -0800 Subject: [PATCH 24/36] added params to dag generation --- dbt_cloud_plugin/helpers.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dbt_cloud_plugin/helpers.py b/dbt_cloud_plugin/helpers.py index 073e2af..35fc1ce 100644 --- a/dbt_cloud_plugin/helpers.py +++ b/dbt_cloud_plugin/helpers.py @@ -6,7 +6,7 @@ from .operators.dbt_cloud_run_job_operator import DbtCloudRunJobOperator -def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_models, ensure_models_ran=True, retries=0): +def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_models, ensure_models_ran=True, retries=0, params=None): """ Create a dependency from one or more tasks on a set of models succeeding in a dbt task. This function generates a new DbtCloudCheckModelResultOperator @@ -58,7 +58,8 @@ def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_mode dbt_cloud_run_id=f'{{{{ ti.xcom_pull(task_ids="{dbt_job_task.task_id}", key="dbt_cloud_run_id") }}}}', model_names=dependent_models, ensure_models_ran=ensure_models_ran, - retries=retries + retries=retries, + params=params ) check_upstream_dbt_job_state >> check_dbt_model_successful From 43148c07ca40bcda5117b4934d06d3f19a77efae Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Tue, 5 Mar 2024 15:47:55 -0800 Subject: [PATCH 25/36] Added Alerting For Model Owners --- dbt_cloud_plugin/helpers.py | 20 ++++++++++++- .../dbt_cloud_run_and_watch_job_operator.py | 30 +++++++++++++++---- 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/dbt_cloud_plugin/helpers.py b/dbt_cloud_plugin/helpers.py index 35fc1ce..d2a69c5 100644 --- a/dbt_cloud_plugin/helpers.py +++ b/dbt_cloud_plugin/helpers.py @@ -1,11 +1,29 @@ from airflow.utils.task_group import TaskGroup from airflow.models import BaseOperator from airflow.operators.python_operator import ShortCircuitOperator +from airflow.utils.decorators import apply_defaults +from airflow.exceptions import AirflowException from .operators.dbt_cloud_check_model_result_operator import DbtCloudCheckModelResultOperator from .operators.dbt_cloud_run_job_operator import DbtCloudRunJobOperator +class DbtCloudRunException(AirflowException): + @apply_defaults + def __init__(self, dbt_cloud_run_id: int, error_message: str, dbt_errors_dict: dict, *args, **kwargs): + if dbt_cloud_run_id is None: + raise ValueError('dbt_cloud_run_id cannot be None.') + if error_message is None: + raise ValueError('error_message cannot be None.') + if dbt_errors_dict is None: + raise ValueError('dbt_errors_dict cannot be None.') + + self.dbt_cloud_run_id = dbt_cloud_run_id + self.error_message = error_message + self.dbt_errors_dict = dbt_errors_dict + + super(AirflowException, self).__init__(error_message, *args, **kwargs) + def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_models, ensure_models_ran=True, retries=0, params=None): """ Create a dependency from one or more tasks on a set of models succeeding @@ -64,4 +82,4 @@ def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_mode check_upstream_dbt_job_state >> check_dbt_model_successful - return dbt_job_task >> check_dbt_model_results >> downstream_tasks \ No newline at end of file + return dbt_job_task >> check_dbt_model_results >> downstream_tasks diff --git a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py index 68d1c1e..0324960 100644 --- a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py +++ b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py @@ -8,6 +8,7 @@ from airflow.exceptions import AirflowException, AirflowSkipException from ..hooks.dbt_cloud_hook import DbtCloudHook from ..operators.dbt_cloud_run_job_operator import DbtCloudRunJobOperator +from ..helpers import DbtCloudRunException class DbtCloudRunAndWatchJobOperator(DbtCloudRunJobOperator): @@ -61,12 +62,31 @@ def poke(self, run_id): run_status = dbt_cloud_hook.get_run_status(run_id=run_id) self.log.info('State of Run ID {}: {}'.format(run_id, run_status)) - TERMINAL_RUN_STATES = ['Success', 'Error', 'Cancelled'] - FAILED_RUN_STATES = ['Error', 'Cancelled'] + if run_status.strip() == 'Cancelled': + raise AirflowException(f'dbt cloud Run ID {run_id} Cancelled.') + + elif run_status.strip() == 'Error': + run_results = dbt_cloud_hook.get_all_run_results(run_id=run_id) + manifest = dbt_cloud_hook.get_run_manifest(run_id=run_id) - if run_status.strip() in FAILED_RUN_STATES: - raise AirflowException('dbt cloud Run ID {} Failed.'.format(run_id)) - if run_status.strip() in TERMINAL_RUN_STATES: + errors = {} + fail_states = {result['unique_id'] for result in run_results if result['status'] in ['error', 'failure']} + for unique_id in fail_states: + errors[unique_id] = { + 'tags': manifest['nodes'][unique_id]['tags'], + 'resource_type': manifest['nodes'][unique_id]['resource_type'], + 'depends_on': manifest['nodes'][unique_id]['depends_on']['nodes'], + 'parent_models': {model: manifest['nodes'][model]['tags'] for model in manifest['nodes'][unique_id]['depends_on']['nodes']} + } + + raise DbtCloudRunException( + dbt_cloud_run_id=run_id, + error_message=f'dbt cloud Run ID {run_id} Failed.', + dbt_errors_dict=errors + ) + + elif run_status.strip() == 'Success': return True + else: return False From 74b8e7d9d12f1a8449b1fc7014fe507f4c63ca1b Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Tue, 5 Mar 2024 15:49:54 -0800 Subject: [PATCH 26/36] remove extra imports --- .../operators/dbt_cloud_run_and_watch_job_operator.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py index 0324960..a88880c 100644 --- a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py +++ b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py @@ -1,9 +1,6 @@ # -*- coding: utf-8 -*- -import json -import requests import time -from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.exceptions import AirflowException, AirflowSkipException from ..hooks.dbt_cloud_hook import DbtCloudHook From 707f39d22529d858458e90fc237e1b897a566368 Mon Sep 17 00:00:00 2001 From: Andrew Pope Date: Thu, 14 Mar 2024 17:35:11 -0600 Subject: [PATCH 27/36] include account_id and project_id in the exception (#9) Co-authored-by: Andrew Pope --- dbt_cloud_plugin/helpers.py | 8 +++++++- .../operators/dbt_cloud_run_and_watch_job_operator.py | 9 ++++++++- dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py | 2 +- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/dbt_cloud_plugin/helpers.py b/dbt_cloud_plugin/helpers.py index d2a69c5..efc4fea 100644 --- a/dbt_cloud_plugin/helpers.py +++ b/dbt_cloud_plugin/helpers.py @@ -10,15 +10,21 @@ class DbtCloudRunException(AirflowException): @apply_defaults - def __init__(self, dbt_cloud_run_id: int, error_message: str, dbt_errors_dict: dict, *args, **kwargs): + def __init__(self, dbt_cloud_run_id: int, dbt_cloud_account_id: int, dbt_cloud_project_id: int, error_message: str, dbt_errors_dict: dict, *args, **kwargs): if dbt_cloud_run_id is None: raise ValueError('dbt_cloud_run_id cannot be None.') + if dbt_cloud_account_id is None: + raise ValueError('dbt_cloud_run_id cannot be None.') + if dbt_cloud_project_id is None: + raise ValueError('dbt_cloud_run_id cannot be None.') if error_message is None: raise ValueError('error_message cannot be None.') if dbt_errors_dict is None: raise ValueError('dbt_errors_dict cannot be None.') self.dbt_cloud_run_id = dbt_cloud_run_id + self.dbt_cloud_account_id = dbt_cloud_account_id + self.dbt_cloud_project_id = dbt_cloud_project_id self.error_message = error_message self.dbt_errors_dict = dbt_errors_dict diff --git a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py index a88880c..646c797 100644 --- a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py +++ b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py @@ -31,8 +31,13 @@ def __init__(self, super(DbtCloudRunAndWatchJobOperator, self).__init__(*args, **kwargs) def execute(self, **kwargs): - run_id = super(DbtCloudRunAndWatchJobOperator, self).execute(**kwargs) + response = super(DbtCloudRunAndWatchJobOperator, self).execute(**kwargs) + run_id = response['id'] + self.account_id = response['job']['account_id'] + self.project_id = response['job']['project_id'] + self.environment_id = response['job']['environment_id'] + # basically copy-pasting the Sensor code self.log.info(f'Starting poke for job {run_id}') try_number = 1 @@ -78,6 +83,8 @@ def poke(self, run_id): raise DbtCloudRunException( dbt_cloud_run_id=run_id, + dbt_cloud_account_id=self.account_id, + dbt_cloud_project_id=self.project_id, error_message=f'dbt cloud Run ID {run_id} Failed.', dbt_errors_dict=errors ) diff --git a/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py index 3006230..f5f6236 100644 --- a/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py +++ b/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py @@ -65,4 +65,4 @@ def execute(self, **kwargs): run_id = trigger_resp['id'] self.xcom_push(kwargs['context'], 'dbt_cloud_run_id', run_id) - return run_id + return trigger_resp From e2acb20d556aa361c829e80967ecd10dea39d0cb Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Wed, 3 Apr 2024 09:13:00 -0700 Subject: [PATCH 28/36] added retries and handling for alerting edge case --- dbt_cloud_plugin/hooks/dbt_cloud_hook.py | 23 ++++++++++++++----- .../dbt_cloud_run_and_watch_job_operator.py | 3 ++- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py index 24f3fb3..1bf9f94 100644 --- a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py +++ b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py @@ -1,3 +1,5 @@ +import time + from ..dbt_cloud.dbt_cloud import DbtCloud from airflow.hooks.base_hook import BaseHook from airflow.exceptions import AirflowException @@ -91,13 +93,22 @@ def get_all_run_results(self, run_id): job = dbt_cloud.get_job(job_id) total_steps = len(job['execute_steps']) - # the first 3 steps of a dbt Cloud job are always the same and - # never have any run results + # the first 3 steps of a dbt Cloud job are always the same and never have any run results + # occasionally, the run_results.json file can take a few seconds to generate starting_step = 4 - all_run_results = [] - for step in range(starting_step, starting_step + total_steps): - run_results = dbt_cloud.get_artifact(run_id, 'run_results.json', step=step) - all_run_results.extend(run_results['results']) + attempts = 0 + while attempts < 3: + all_run_results = [] + try: + for step in range(starting_step, starting_step + total_steps): + run_results = dbt_cloud.get_artifact(run_id, 'run_results.json', step=step) + all_run_results.extend(run_results['results']) + break + except RuntimeError as e: + attempts += 1 + if attempts == 3: + raise e + time.sleep(15) return all_run_results diff --git a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py index 646c797..2ef30f9 100644 --- a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py +++ b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py @@ -72,8 +72,9 @@ def poke(self, run_id): manifest = dbt_cloud_hook.get_run_manifest(run_id=run_id) errors = {} - fail_states = {result['unique_id'] for result in run_results if result['status'] in ['error', 'failure']} + fail_states = {result['unique_id'] for result in run_results if result['status'] in ['error', 'failure', 'fail']} for unique_id in fail_states: + self.log.info(manifest['nodes'][unique_id]['tags']) errors[unique_id] = { 'tags': manifest['nodes'][unique_id]['tags'], 'resource_type': manifest['nodes'][unique_id]['resource_type'], From 3624ff7b791b5db8812f788a47b696e8d097d270 Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Wed, 3 Apr 2024 09:22:23 -0700 Subject: [PATCH 29/36] remove test logging --- .../operators/dbt_cloud_run_and_watch_job_operator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py index 2ef30f9..6ac462d 100644 --- a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py +++ b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py @@ -74,7 +74,6 @@ def poke(self, run_id): errors = {} fail_states = {result['unique_id'] for result in run_results if result['status'] in ['error', 'failure', 'fail']} for unique_id in fail_states: - self.log.info(manifest['nodes'][unique_id]['tags']) errors[unique_id] = { 'tags': manifest['nodes'][unique_id]['tags'], 'resource_type': manifest['nodes'][unique_id]['resource_type'], From f04457f2ec7f2584205d239ee41a62317a42fe0e Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Wed, 10 Apr 2024 15:54:19 -0700 Subject: [PATCH 30/36] updated parsing logic for dbt 1.7 --- .../dbt_cloud_run_and_watch_job_operator.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py index 6ac462d..0f7cc0b 100644 --- a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py +++ b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py @@ -73,19 +73,24 @@ def poke(self, run_id): errors = {} fail_states = {result['unique_id'] for result in run_results if result['status'] in ['error', 'failure', 'fail']} + for unique_id in fail_states: + tags = manifest['nodes'][unique_id]['tags'] + resource_type = manifest['nodes'][unique_id]['resource_type'] + depends_on = manifest['nodes'][unique_id]['depends_on']['nodes'] + parent_models = {model: manifest['nodes'][model]['tags'] for model in manifest['nodes'][unique_id]['depends_on']['nodes'] if model not in manifest['sources']} errors[unique_id] = { - 'tags': manifest['nodes'][unique_id]['tags'], - 'resource_type': manifest['nodes'][unique_id]['resource_type'], - 'depends_on': manifest['nodes'][unique_id]['depends_on']['nodes'], - 'parent_models': {model: manifest['nodes'][model]['tags'] for model in manifest['nodes'][unique_id]['depends_on']['nodes']} + 'tags': tags, + 'resource_type': resource_type, + 'depends_on': depends_on, + 'parent_models': parent_models, } raise DbtCloudRunException( dbt_cloud_run_id=run_id, dbt_cloud_account_id=self.account_id, dbt_cloud_project_id=self.project_id, - error_message=f'dbt cloud Run ID {run_id} Failed.', + error_message=f'dbt cloud Run ID {run_id} Failed.', dbt_errors_dict=errors ) From feda0f21b523514fd6659167119ee701110415d1 Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Tue, 14 May 2024 16:40:26 -0700 Subject: [PATCH 31/36] go backwards to last good manifest --- dbt_cloud_plugin/hooks/dbt_cloud_hook.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py index 1bf9f94..92e7676 100644 --- a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py +++ b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py @@ -106,8 +106,11 @@ def get_all_run_results(self, run_id): break except RuntimeError as e: attempts += 1 - if attempts == 3: + if attempts == 3 and step == starting_step: raise e + elif attempts == 3: + total_steps = step - starting_step + attempts = 0 time.sleep(15) return all_run_results From ee08dcb3e2733b27090f7d248e219e819f2fe785 Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Tue, 14 May 2024 20:19:59 -0700 Subject: [PATCH 32/36] refactor --- dbt_cloud_plugin/hooks/dbt_cloud_hook.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py index 92e7676..3e287fb 100644 --- a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py +++ b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py @@ -97,8 +97,8 @@ def get_all_run_results(self, run_id): # occasionally, the run_results.json file can take a few seconds to generate starting_step = 4 attempts = 0 + all_run_results = [] while attempts < 3: - all_run_results = [] try: for step in range(starting_step, starting_step + total_steps): run_results = dbt_cloud.get_artifact(run_id, 'run_results.json', step=step) @@ -106,11 +106,10 @@ def get_all_run_results(self, run_id): break except RuntimeError as e: attempts += 1 - if attempts == 3 and step == starting_step: + if attempts == 3 and len(all_run_results) == 0: raise e elif attempts == 3: - total_steps = step - starting_step - attempts = 0 + return all_run_results time.sleep(15) return all_run_results From 084f57b6840563386192e3c0b6dd746e69707dc4 Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Tue, 14 May 2024 20:46:43 -0700 Subject: [PATCH 33/36] refactor+ --- dbt_cloud_plugin/hooks/dbt_cloud_hook.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py index 3e287fb..4b4692d 100644 --- a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py +++ b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py @@ -95,21 +95,27 @@ def get_all_run_results(self, run_id): # the first 3 steps of a dbt Cloud job are always the same and never have any run results # occasionally, the run_results.json file can take a few seconds to generate - starting_step = 4 + current_index = 4 + final_index = current_index + total_steps attempts = 0 - all_run_results = [] + all_run_results = [] + while attempts < 3: try: - for step in range(starting_step, starting_step + total_steps): + for step in range(current_index, final_index): run_results = dbt_cloud.get_artifact(run_id, 'run_results.json', step=step) all_run_results.extend(run_results['results']) + current_index += 1 break except RuntimeError as e: attempts += 1 + if attempts == 3 and len(all_run_results) == 0: raise e elif attempts == 3: + # sometimes the last step is not available, so we need to return what we have return all_run_results + time.sleep(15) return all_run_results From 9f2ab8f5568b1377260b7e20a22009ef8ac086e7 Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Wed, 15 May 2024 07:01:52 -0700 Subject: [PATCH 34/36] added warning --- dbt_cloud_plugin/hooks/dbt_cloud_hook.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py index 4b4692d..1076693 100644 --- a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py +++ b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py @@ -1,4 +1,5 @@ import time +import warnings from ..dbt_cloud.dbt_cloud import DbtCloud from airflow.hooks.base_hook import BaseHook @@ -114,6 +115,7 @@ def get_all_run_results(self, run_id): raise e elif attempts == 3: # sometimes the last step is not available, so we need to return what we have + warnings.warn(f'Only {len(all_run_results)} of {total_steps} steps were available in run_results.json') return all_run_results time.sleep(15) From ebf0e8893c27e2ed2cfef97db5d6f76644650919 Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Fri, 12 Sep 2025 12:01:58 -0700 Subject: [PATCH 35/36] ignore transient issues --- .../operators/dbt_cloud_run_and_watch_job_operator.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py index 0f7cc0b..f6dd830 100644 --- a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py +++ b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py @@ -61,7 +61,12 @@ def run_duration(): def poke(self, run_id): self.log.info('Sensor checking state of dbt cloud run ID: %s', run_id) dbt_cloud_hook = DbtCloudHook(dbt_cloud_conn_id=self.dbt_cloud_conn_id) - run_status = dbt_cloud_hook.get_run_status(run_id=run_id) + try: + run_status = dbt_cloud_hook.get_run_status(run_id=run_id) + except Exception as e: + # Tolerate transient connectivity errors during polling; keep the job alive + self.log.warning('Transient error while fetching run status for %s: %s', run_id, str(e)) + return False self.log.info('State of Run ID {}: {}'.format(run_id, run_status)) if run_status.strip() == 'Cancelled': From 7ffe5244afbe88a3180e088aa27a552b77d97a8e Mon Sep 17 00:00:00 2001 From: UrbanPancake Date: Fri, 12 Sep 2025 12:03:44 -0700 Subject: [PATCH 36/36] ignore only transient --- .../operators/dbt_cloud_run_and_watch_job_operator.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py index f6dd830..95d2b39 100644 --- a/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py +++ b/dbt_cloud_plugin/operators/dbt_cloud_run_and_watch_job_operator.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import time +import requests from airflow.utils.decorators import apply_defaults from airflow.exceptions import AirflowException, AirflowSkipException @@ -63,9 +64,9 @@ def poke(self, run_id): dbt_cloud_hook = DbtCloudHook(dbt_cloud_conn_id=self.dbt_cloud_conn_id) try: run_status = dbt_cloud_hook.get_run_status(run_id=run_id) - except Exception as e: + except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: # Tolerate transient connectivity errors during polling; keep the job alive - self.log.warning('Transient error while fetching run status for %s: %s', run_id, str(e)) + self.log.warning('Transient network error while fetching run status for %s: %s', run_id, str(e)) return False self.log.info('State of Run ID {}: {}'.format(run_id, run_status))