Search code examples
airflowdbt

How do I dispatch dbt logs from Airflow to the relevant owner?


I am running dbt (v0.19.0) on Apache Airflow (v2.0.2) using KuberneterPodOperator and sending alerts to Slack on failures. Nested within dbt there are multiple models from multiple owners with cross dependencies between them and they are all run together with the run command.

For example:

KubernetesPodOperator(
  dbt_command="run",
  task_id="run_models",
  on_failure_callback=send_slack_alert,
)

I am trying to make sure that each model owner gets the alert that belongs to them in their relevant channel.

To explain my problem better, Let's say there are two models within dbt; Model-A & Model-B. Model-A is owned by the A-Team and Model-B is owned by the B-Team. With this approach (because there is one dbt run command) if there is a failure of Model-A, the failure will appear in the shared logs for Model-A and Model-B. Let's also assume that both A-Team & B-Team have their own alert channels. However, because dbt is run with a single command all the alerts are sent to a common channel.

Now imagine having plenty of models (Model-A, Model-B.....Model-Z). How can I improve the existing process to make sure that failures in Model-A get sent to the A-team alert channel, failures in Model-B get sent to the B-team alert channel...and so on.

How do I dispatch errors from dbt (running within Airflow) to the relevant owner to make alerts actionable?


Solution

  • I'd suggest you're likely to end up with n models owned by m teams.

    Your simplest change would be to tag each dbt model with an owning team. And then to call that model calling back to that team,e.g.

    KubernetesPodOperator(
      dbt_command="run -m tags:team1",
      task_id="run_models",
      on_failure_callback=send_slack_alert_team1,
    )
    

    You could consider passing args to your alerts rather than custom callbacks (Pass other arguments to on_failure_callback).

    This will work as long as you want to run your models in groups of owners, but could get to issues if there are intra owner dependencies.

    You can break down your Airflow model to compose a dynamic dag from your models, running one model at a time, e.g. here https://www.astronomer.io/blog/airflow-dbt-1.

    You could then assign the slack operator in that same dynamic loop.