Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
ddc7624
sensor now fails on cancelled runs
Jan 27, 2021
fab9d7a
updates so this can be installed as a package
Jan 27, 2021
bed6444
updates to send optional params to run job
Jan 28, 2021
510ade5
operator now accepts steps_override parameter
Jan 28, 2021
ad80010
adds new run and watch operator to facilitate retries
Feb 3, 2021
e8e7f91
small fix to error
Feb 3, 2021
f54acdb
Environment id filter (#1)
aapope Dec 7, 2021
4d0a89c
add a new operator to check specific model results (#2)
aapope Sep 26, 2023
2d493a0
more meaningful errors for DbtCloudCheckModelResultOperator
Sep 27, 2023
4a45096
fix for formatting of the error message
Sep 27, 2023
dd76a02
fix for string version of None
Sep 27, 2023
0cd6686
added option to set number of retries for generate model deps
UrbanPancake Oct 24, 2023
f0a3279
Revert "added option to set number of retries for generate model deps"
UrbanPancake Oct 24, 2023
2940fe7
added retries to generate model dependency
UrbanPancake Oct 24, 2023
10f3ef5
Merge pull request #3 from aapope/add-retries-to-generate-model-depen…
UrbanPancake Oct 24, 2023
47da0f9
changed defualt upstream check
UrbanPancake Dec 6, 2023
2766372
added short circuit operator and placed in task group
UrbanPancake Dec 6, 2023
9d821da
Merge branch 'master' into aapope/dbt-cloud-plugin/change-dbt-sensor-…
UrbanPancake Dec 6, 2023
f479953
fixed period issue
UrbanPancake Dec 6, 2023
d0a9606
Merge pull request #5 from aapope/aapope/dbt-cloud-plugin/fix-task-id…
UrbanPancake Dec 6, 2023
4ac9be1
Merge pull request #4 from aapope/aapope/dbt-cloud-plugin/change-dbt-…
UrbanPancake Dec 7, 2023
fab815c
fixed to local version match
UrbanPancake Dec 7, 2023
3dccb78
fixed callback
UrbanPancake Dec 7, 2023
30820f9
updated lambda function
UrbanPancake Dec 7, 2023
62d50de
Merge pull request #6 from aapope/update-task-state
UrbanPancake Dec 7, 2023
b2c93e7
change
UrbanPancake Dec 7, 2023
9151185
changed task
UrbanPancake Dec 7, 2023
36bc97f
default notification rules work
UrbanPancake Dec 7, 2023
ecfc497
added params to dag generation
UrbanPancake Feb 13, 2024
0e9f975
Merge pull request #7 from aapope/add-params-to-generate-model-depend…
UrbanPancake Feb 13, 2024
43148c0
Added Alerting For Model Owners
UrbanPancake Mar 5, 2024
74b8e7d
remove extra imports
UrbanPancake Mar 5, 2024
79d52b1
Merge pull request #8 from aapope/feature/model-ownership-slack-notif…
UrbanPancake Mar 5, 2024
707f39d
include account_id and project_id in the exception (#9)
aapope Mar 14, 2024
e2acb20
added retries and handling for alerting edge case
UrbanPancake Apr 3, 2024
3624ff7
remove test logging
UrbanPancake Apr 3, 2024
50d447d
Merge pull request #10 from aapope/improve-retries-and-alerting
UrbanPancake Apr 3, 2024
f04457f
updated parsing logic for dbt 1.7
UrbanPancake Apr 10, 2024
07beb08
Merge pull request #11 from aapope/source-parsing-fix
UrbanPancake Apr 10, 2024
feda0f2
go backwards to last good manifest
UrbanPancake May 14, 2024
ee08dcb
refactor
UrbanPancake May 15, 2024
084f57b
refactor+
UrbanPancake May 15, 2024
9f2ab8f
added warning
UrbanPancake May 15, 2024
ecd8f44
Merge pull request #12 from aapope/use-last-valid-link-data
UrbanPancake May 15, 2024
ebf0e88
ignore transient issues
UrbanPancake Sep 12, 2025
7ffe524
ignore only transient
UrbanPancake Sep 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions dbt_cloud_plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1 @@
from airflow.plugins_manager import AirflowPlugin
from dbt_cloud_plugin.hooks.dbt_cloud_hook import DbtCloudHook
from dbt_cloud_plugin.operators.dbt_cloud_run_job_operator import DbtCloudRunJobOperator
from dbt_cloud_plugin.sensors.dbt_cloud_run_sensor import DbtCloudRunSensor

class DbtCloudPlugin(AirflowPlugin):
name = "dbt_cloud_plugin"
operators = [DbtCloudRunJobOperator]
hooks = [DbtCloudHook]
sensors = [DbtCloudRunSensor]
from .helpers import generate_dbt_model_dependency
33 changes: 27 additions & 6 deletions dbt_cloud_plugin/dbt_cloud/dbt_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import requests
import time

from airflow.exceptions import AirflowException


class DbtCloud(object):
"""
Class for interacting with the dbt Cloud API
Expand All @@ -25,19 +28,23 @@ def _get(self, url_suffix):
if response.status_code == 200:
return json.loads(response.content)
else:
raise RuntimeError(response.content)
raise RuntimeError(f'Error getting URL {url}:\n{str(response.content)}')

def _post(self, url_suffix, data=None):
url = self.api_base + url_suffix
headers = {'Authorization': 'token %s' % self.api_token}
response = requests.post(url, headers=headers, data=data)
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
return json.loads(response.content)
else:
raise RuntimeError(response.content)

def list_jobs(self):
return self._get('/accounts/%s/jobs/' % self.account_id).get('data')
def list_jobs(self, environment_id=None):
jobs = self._get('/accounts/%s/jobs/' % self.account_id).get('data')
if environment_id is not None:
return [j for j in jobs if str(j['environment_id']) == str(environment_id)]
else:
return jobs

def get_run(self, run_id):
return self._get('/accounts/%s/runs/%s/' % (self.account_id, run_id)).get('data')
Expand All @@ -56,8 +63,8 @@ def try_get_run(self, run_id, max_tries=3):

raise RuntimeError("Too many failures ({}) while querying for run status".format(run_id))

def run_job(self, job_name, data=None):
jobs = self.list_jobs()
def run_job(self, job_name, data=None, environment_id=None):
jobs = self.list_jobs(environment_id=environment_id)

job_matches = [j for j in jobs if j['name'] == job_name]

Expand All @@ -67,3 +74,17 @@ def run_job(self, job_name, data=None):
job_def = job_matches[0]
trigger_resp = self.trigger_job_run(job_id=job_def['id'], data=data)
return trigger_resp

def get_artifact(self, run_id, artifact_filename, step=None):
if step is not None:
query_string = f'?step={step}'
else:
query_string = ''

return self._get(
f'/accounts/{self.account_id}/runs/{run_id}/artifacts/{artifact_filename}{query_string}'
)

def get_job(self, job_id):
return self._get(f'/accounts/{self.account_id}/jobs/{job_id}').get('data')

91 changes: 91 additions & 0 deletions dbt_cloud_plugin/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from airflow.utils.task_group import TaskGroup
from airflow.models import BaseOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException

from .operators.dbt_cloud_check_model_result_operator import DbtCloudCheckModelResultOperator
from .operators.dbt_cloud_run_job_operator import DbtCloudRunJobOperator


class DbtCloudRunException(AirflowException):
@apply_defaults
def __init__(self, dbt_cloud_run_id: int, dbt_cloud_account_id: int, dbt_cloud_project_id: int, error_message: str, dbt_errors_dict: dict, *args, **kwargs):
if dbt_cloud_run_id is None:
raise ValueError('dbt_cloud_run_id cannot be None.')
if dbt_cloud_account_id is None:
raise ValueError('dbt_cloud_run_id cannot be None.')
if dbt_cloud_project_id is None:
raise ValueError('dbt_cloud_run_id cannot be None.')
if error_message is None:
raise ValueError('error_message cannot be None.')
if dbt_errors_dict is None:
raise ValueError('dbt_errors_dict cannot be None.')

self.dbt_cloud_run_id = dbt_cloud_run_id
self.dbt_cloud_account_id = dbt_cloud_account_id
self.dbt_cloud_project_id = dbt_cloud_project_id
self.error_message = error_message
self.dbt_errors_dict = dbt_errors_dict

super(AirflowException, self).__init__(error_message, *args, **kwargs)

def generate_dbt_model_dependency(dbt_job_task, downstream_tasks, dependent_models, ensure_models_ran=True, retries=0, params=None):
"""
Create a dependency from one or more tasks on a set of models succeeding
in a dbt task. This function generates a new DbtCloudCheckModelResultOperator
task between dbt_job_task and downstream_tasks, checking that dependent_models
all ran successfully.

:param dbt_job_task: The dbt Cloud operator which kicked off the run you want to check.
Both the credentials and the run_id will be pulled from this task.
:type dbt_job_task: DbtCloudRunJobOperator or DbtCloudRunAndWatchJobOperator
:param downstream_tasks: The downstream task(s) which depend on the model(s) succeeding.
Can be either a single task, a single TaskGroup, or a list of tasks.
:type downstream_tasks: BaseOperator or TaskGroup or list[BaseOperator] or list[TaskGroup]
:param dependent_models: The name(s) of the model(s) to check. See
DbtCloudCheckModelResultOperator for more details.
:type dependent_models: str or list[str]
:param ensure_models_ran: Whether to require that the dependent_models actually ran in
the run. If False, it will silently ignore models that didn't run.
:type ensure_models_ran: bool, default True
"""

if not isinstance(dbt_job_task, DbtCloudRunJobOperator):
raise TypeError('dbt_job_task must be of type DbtCloudRunJobOperator or DbtCloudRunAndWatchOperator')

if isinstance(downstream_tasks, list):
if len(downstream_tasks) == 0:
raise ValueError('You must pass at least one task in downstream_tasks')
if not (isinstance(downstream_tasks[0], BaseOperator) or isinstance(downstream_tasks[0], TaskGroup)):
raise TypeError('The elements of the downstream_tasks list must be of type BaseOperator or TaskGRoup')
elif not (isinstance(downstream_tasks, TaskGroup) or isinstance(downstream_tasks, BaseOperator)):
raise TypeError('downstream_tasks must be of one of the following types: BaseOperator, TaskGroup, or a list of one of those two')

if isinstance(dependent_models, str):
dependent_models = [dependent_models]
model_ids = '__'.join(dependent_models)
task_id = f'check_dbt_model_results__{dbt_job_task.task_id}__{model_ids}'
task_id = task_id[:255].replace('.', '__')

with TaskGroup(group_id=task_id) as check_dbt_model_results:
check_upstream_dbt_job_state = ShortCircuitOperator(
task_id='check_upstream_dbt_job_state',
python_callable=lambda **context: context['dag_run'].get_task_instance(dbt_job_task.task_id).state in ['success', 'failed'],
trigger_rule='all_done',
provide_context=True
)

check_dbt_model_successful = DbtCloudCheckModelResultOperator(
task_id='check_dbt_model_successful',
dbt_cloud_conn_id=dbt_job_task.dbt_cloud_conn_id,
dbt_cloud_run_id=f'{{{{ ti.xcom_pull(task_ids="{dbt_job_task.task_id}", key="dbt_cloud_run_id") }}}}',
model_names=dependent_models,
ensure_models_ran=ensure_models_ran,
retries=retries,
params=params
)

check_upstream_dbt_job_state >> check_dbt_model_successful

return dbt_job_task >> check_dbt_model_results >> downstream_tasks
93 changes: 92 additions & 1 deletion dbt_cloud_plugin/hooks/dbt_cloud_hook.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from dbt_cloud_plugin.dbt_cloud.dbt_cloud import DbtCloud
import time
import warnings

from ..dbt_cloud.dbt_cloud import DbtCloud
from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException

Expand Down Expand Up @@ -44,6 +47,20 @@ def get_conn(self):

return DbtCloud(dbt_cloud_account_id, dbt_cloud_api_token)

def _get_conn_extra(self):
conn = self.get_connection(self.dbt_cloud_conn_id).extra_dejson
config = {}
if 'git_branch' in conn:
config['git_branch'] = conn['git_branch']
if 'schema_override' in conn:
config['schema_override'] = conn['schema_override']
if 'target_name_override' in conn:
config['target_name_override'] = conn['target_name_override']
if 'environment_id' in conn:
config['environment_id'] = conn['environment_id']

return config

def get_run_status(self, run_id):
"""
Return the status of an dbt cloud run.
Expand All @@ -53,3 +70,77 @@ def get_run_status(self, run_id):
run = dbt_cloud.try_get_run(run_id=run_id)
status_name = RunStatus.lookup(run['status'])
return status_name

def get_run_manifest(self, run_id):
"""
Return the manifest.json from a dbt Cloud run.
"""
dbt_cloud = self.get_conn()
return dbt_cloud.get_artifact(run_id, 'manifest.json')

def get_all_run_results(self, run_id):
"""
Return the results array from run_results.json from a dbt Cloud run,
concatenated across all (real) steps.
"""
dbt_cloud = self.get_conn()

# first, determine the number of steps in this job
# it will either be defined in the run or in the job definition
run = dbt_cloud.get_run(run_id)
total_steps = len(run['run_steps'])
if total_steps == 0: # not defined on the run, check the job
job_id = run['job_id']
job = dbt_cloud.get_job(job_id)
total_steps = len(job['execute_steps'])

# the first 3 steps of a dbt Cloud job are always the same and never have any run results
# occasionally, the run_results.json file can take a few seconds to generate
current_index = 4
final_index = current_index + total_steps
attempts = 0
all_run_results = []

while attempts < 3:
try:
for step in range(current_index, final_index):
run_results = dbt_cloud.get_artifact(run_id, 'run_results.json', step=step)
all_run_results.extend(run_results['results'])
current_index += 1
break
except RuntimeError as e:
attempts += 1

if attempts == 3 and len(all_run_results) == 0:
raise e
elif attempts == 3:
# sometimes the last step is not available, so we need to return what we have
warnings.warn(f'Only {len(all_run_results)} of {total_steps} steps were available in run_results.json')
return all_run_results

time.sleep(15)

return all_run_results

def run_job(self, job_name, git_branch=None, schema_override=None,
target_name_override=None, steps_override=None, environment_id=None):
dbt_cloud = self.get_conn()
extra = self._get_conn_extra()

data = {'cause': 'Kicked off via Airflow'}
# add optional settings
if git_branch or extra.get('git_branch', None):
data['git_branch'] = git_branch or extra.get('git_branch', None)
if schema_override or extra.get('schema_override', None):
data['schema_override'] = schema_override or extra.get('schema_override', None)
if target_name_override or extra.get('target_name_override', None):
data['target_name_override'] = target_name_override or extra.get('target_name_override', None)
if steps_override:
data['steps_override'] = steps_override

# get environment
environment_id = environment_id or extra.get('environment_id', None)

self.log.info(f'Triggering job {job_name} with data {data}')

return dbt_cloud.run_job(job_name, data=data, environment_id=environment_id)
113 changes: 113 additions & 0 deletions dbt_cloud_plugin/operators/dbt_cloud_check_model_result_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
import json
import requests
import time

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException, AirflowSkipException
from ..hooks.dbt_cloud_hook import DbtCloudHook


SUCCESSFUL_STATUSES = ['success', 'pass']


class DbtModelException(Exception):
pass


class DbtModelFailedException(DbtModelException):
pass


class DbtModelNotRunException(DbtModelException):
pass


class DbtCloudCheckModelResultOperator(BaseOperator):
"""
Check the results of a dbt Cloud job to see whether the model(s) you
care about ran successfully. Useful if you have a large dbt Cloud job,
but each of your downstream tasks only requires a small subset of models
to succeed.

:param dbt_cloud_run_id: Run ID of a finished dbt Cloud job. Note that
this task must not start running until the dbt Cloud job has completed,
otherwise it will error out. See DbtCloudRunAndWatchJobOperator or
DbtCloudRunSensor.
:type dbt_cloud_run_id: str or int
:param model_names: A single model name or list of model names to check.
Tests and snapshot names also work. Note this must be the /name/, not the
node ID. In addition to the model name(s) supplied, all of the tests for
that model will also be checked (though unlike the model itself, they need
not have been run).
:type model_names: str or list[str]
:param dbt_cloud_conn_id: dbt Cloud connection ID
:type dbt_cloud_conn_id: str
:param ensure_models_ran: Whether to ensure all of the model_names were actually
executed in this run. Defaults to True to avoid accidentally mistyping the
model name, negating the value of this check.
:type ensure_models_ran: bool, default True
"""

template_fields = ['dbt_cloud_run_id']

@apply_defaults
def __init__(self, dbt_cloud_run_id=None, model_names=None, ensure_models_ran=True, dbt_cloud_conn_id='dbt_default', *args, **kwargs):
super(DbtCloudCheckModelResultOperator, self).__init__(*args, **kwargs)

if dbt_cloud_run_id is None:
raise AirflowException('No dbt Cloud run_id was supplied.')
if model_names is None:
raise AirflowException('No model names supplied.')

self.dbt_cloud_conn_id = dbt_cloud_conn_id
self.dbt_cloud_run_id = dbt_cloud_run_id
if isinstance(model_names, str):
model_names = [model_names]
self.model_names = model_names
self.ensure_models_ran = ensure_models_ran

def _find_test_dependencies_for_model_id(self, model_id, manifest):
tests = []
for node, values in manifest['nodes'].items():
if values['resource_type'] != 'test':
continue
for dependency in values['depends_on']['nodes']:
if dependency == model_id:
tests.append(node)
return tests

def _find_model_id_from_name(self, model_name, manifest):
models = manifest['nodes'].values()
for model in models:
if model['name'] == model_name:
return model['unique_id']

def _check_that_model_passed(self, model_name, manifest, run_results):
model_id = self._find_model_id_from_name(model_name, manifest)
tests = self._find_test_dependencies_for_model_id(model_id, manifest)
all_dependencies = [model_id] + tests
self.log.info(f'Checking all dependencies for {model_name}: {all_dependencies}')

ran_model = False
for result in run_results:
if result['unique_id'] == model_id:
ran_model = True
if result['unique_id'] in all_dependencies:
if result['status'] not in SUCCESSFUL_STATUSES:
raise DbtModelFailedException(f'Dependency {result["unique_id"]} did not pass, status: {result["status"]}!')

if not ran_model and self.ensure_models_ran:
raise DbtModelNotRunException(f'Model {model_id} was not run!')

def execute(self, **kwargs):
if self.dbt_cloud_run_id is None or self.dbt_cloud_run_id in ('', 'None'):
raise ValueError('dbt_cloud_run_id is empty!')
dbt_cloud_hook = DbtCloudHook(dbt_cloud_conn_id=self.dbt_cloud_conn_id)
manifest = dbt_cloud_hook.get_run_manifest(self.dbt_cloud_run_id)
run_results = dbt_cloud_hook.get_all_run_results(self.dbt_cloud_run_id)

for model in self.model_names:
self._check_that_model_passed(model, manifest, run_results)

Loading