I am looking for a way to retrieve the last status of DAG-A, in a another DAG-B. Then pass that status as a string in my current DAG-B.
But I cannot find anything online? I don't want to check the status of DAG-A.
Say DAG-A triggers multiple times a day, whenever DAG-B triggers (custom time) I want it to have the last status of DAG-A.
I just have code that would output a Slack Message, having a Slack Message in DAG-A is unfortunately not a good solution for my problem. And I would like to learn how to return status of others dags.
I would want to pass the status in the message body.
def _get_message() -> str:
return "Status of data_pipeline: "
with DAG("slack_dag", start_date=datetime(2021, 1 ,1),
schedule_interval="@daily", default_args=default_args, catchup=False
) as dag:
send_slack_notification = SlackWebhookOperator (
task_id="send_slack_notification",
http_conn_id="slack_conn",
message=_get_message(),
channel="#test-public"
)
If you use the airflow.models.DagRun
object you can query DagRuns occurring between an execution_start_date
and execution_end_date
. Below is code for dag_b
which contains task get_dagrun_info
that queries all runs for dag_a
in the last 10 minutes and logs the latest dagrun state.
Here is the key snippet:
from airflow.models import DagRun
@task
def get_dagrun_info():
dag_runs = DagRun.find(
dag_id="dag_a",
execution_start_date=(datetime.now(timezone.utc)-timedelta(600)),
execution_end_date=datetime.now(timezone.utc)
)
latest_run = dag_runs[-1]
logging.info(latest_run)
logging.info(f"state of latest run for {latest_run.dag_id} is {latest_run.state}")
Here is the full DAG code
import logging
from datetime import datetime, timezone, timedelta
from airflow.decorators import dag, task
from airflow.models import DagRun
@task
def get_dagrun_info():
dag_runs = DagRun.find(
dag_id="dag_a",
execution_start_date=(datetime.now(timezone.utc)-timedelta(600)),
execution_end_date=datetime.now(timezone.utc)
)
latest_run = dag_runs[-1]
logging.info(latest_run)
logging.info(f"state of latest run for {latest_run.dag_id} is {latest_run.state}")
@dag(
dag_id="dag_b",
schedule=None,
start_date=datetime(2023, 8, 15),
catchup=False
)
def my_dag():
return get_dagrun_info()
my_dag()