Search code examples
pythonairflowpipeline

Return Status of DAG-A in another DAG-B


I am looking for a way to retrieve the last status of DAG-A, in a another DAG-B. Then pass that status as a string in my current DAG-B.

But I cannot find anything online? I don't want to check the status of DAG-A.

Say DAG-A triggers multiple times a day, whenever DAG-B triggers (custom time) I want it to have the last status of DAG-A.

I just have code that would output a Slack Message, having a Slack Message in DAG-A is unfortunately not a good solution for my problem. And I would like to learn how to return status of others dags.

I would want to pass the status in the message body.

def _get_message() -> str:
    return "Status of data_pipeline: "

with DAG("slack_dag", start_date=datetime(2021, 1 ,1), 
    schedule_interval="@daily", default_args=default_args, catchup=False
    ) as dag:


    send_slack_notification = SlackWebhookOperator (
        task_id="send_slack_notification",
        http_conn_id="slack_conn",
        message=_get_message(),
        channel="#test-public"
    )

Solution

  • If you use the airflow.models.DagRun object you can query DagRuns occurring between an execution_start_date and execution_end_date. Below is code for dag_b which contains task get_dagrun_info that queries all runs for dag_a in the last 10 minutes and logs the latest dagrun state.

    Here is the key snippet:

    from airflow.models import DagRun
    
    
    @task
    def get_dagrun_info():
        dag_runs = DagRun.find(
            dag_id="dag_a",
            execution_start_date=(datetime.now(timezone.utc)-timedelta(600)),
            execution_end_date=datetime.now(timezone.utc)
        )
        latest_run = dag_runs[-1]
        logging.info(latest_run)
        logging.info(f"state of latest run for {latest_run.dag_id} is {latest_run.state}")
    

    Here is the full DAG code

    import logging
    from datetime import datetime, timezone, timedelta
    
    from airflow.decorators import dag, task
    from airflow.models import DagRun
    
    
    @task
    def get_dagrun_info():
        dag_runs = DagRun.find(
            dag_id="dag_a",
            execution_start_date=(datetime.now(timezone.utc)-timedelta(600)),
            execution_end_date=datetime.now(timezone.utc)
        )
        latest_run = dag_runs[-1]
        logging.info(latest_run)
        logging.info(f"state of latest run for {latest_run.dag_id} is {latest_run.state}")
    
    
    @dag(
        dag_id="dag_b",
        schedule=None,
        start_date=datetime(2023, 8, 15),
        catchup=False
    )
    def my_dag():
        return get_dagrun_info()
    
    my_dag()