Search code examples
airflow

Google Cloud Composer Airflow one task log like status, execution date, etc to call in another task


I created two task task_1 and task_2 where the sole purpose is to get the task_1 log to another log but I am unable to fetch the task_1 details in task_2 like the task_1.status, task_1.execution_date and more.

            # Code:
            import os, json, time, airflow, requests
            from airflow import DAG
            from datetime import datetime, timedelta, timezone
            from airflow.configuration import conf
            from airflow.models import Variable
            from airflow.utils.task_group import TaskGroup
            from airflow.operators.python_operator import PythonOperator

            # These are the functions of the DAG
            def get_task_1_log(**kwargs):
                task_instance = kwargs['task_instance']
                print(task_instance.task_id) # I could get the task_id of the task
                print(task_instance.dag_id) # I could get the dag_id of the task
                print(task_instance.execution_date) # I could get the execution_date of the task

            def get_task_2_log(**kwargs):
                task_instance = kwargs['task_instance']
                print(task_instance.task_id) # I could get the task_id of the task
                print(task_instance.dag_id) # I could get the dag_id of the task
                print(task_instance.execution_date) # I could get the execution_date of the task

            # initializing the default arguments
            default_args = {
                'start_date': datetime(2024, 1, 27),
                'retries': 1,
                'retry_delay': timedelta(minutes=5)
            }

            # DAG
            with DAG("Get_Task_Logs",
                    default_args=default_args,
                    description="Get_Task_Logs",
                    schedule_interval="07 06 * * *",
                    start_date=None,
                    ) as dag:
                
                # defining a task group
                with TaskGroup("my_group", tooltip="my_group") as my_group:

                    # using python operator I am calling the function to get the log of the task_1
                    call_task_1 = PythonOperator(
                        task_id="call_task_1",
                        python_callable=get_task_1_log,
                        trigger_rule='one_success'
                    )
                    
                    # using python operator I am calling the function to get the log of the task_2
                    call_task_2 = PythonOperator(
                        task_id="call_task_1",
                        python_callable=get_task_2_log,
                        trigger_rule='one_success'
                    )
                    
                    # calling the task
                    call_task_1 >> call_task_2 
                
                my_group

Using the above code I could get the task logs of the particular instance, but the issue here is that whenever I am fetching the status of the task in the same task it is giving me running. But I want to get the task instance of task_1 in task_2 so that I could get the information and compare like whether in a group the particular task has completed or not, so as to proceed with other task using that condition.

I also want to see whether in a task group all the tasks have been completed or not so that I could compare whether the all the group tasks are completed in one go to proceed with another group rather than comparing it task wise.


Solution

  • The task_1 and task_2 are part of my_group and my_another_group so using the TaskInstance(task,execution_date). We could get the status of the particular task details like current_state, execution_date and more and also it can be called from anywhere may it be another function or same function no problem but note if you call the task current_state within the same task the state will be "running" as the task is still running and not completed or went for retry.

    import os
    import json
    import time
    import airflow
    import requests
    from airflow import DAG
    from datetime import datetime, timedelta, timezone
    from airflow.configuration import conf
    from airflow.models import Variable, TaskInstance
    from airflow.utils.task_group import TaskGroup
    from airflow.operators.python_operator import PythonOperator
    
    def get_task_1_log(**kwargs):
        task_instance = kwargs["task_instance"]
        print(f"task_1 execution_date: {task_instance.execution_date}")
        print("task_1 state: ", task_instance.current_state())
        # ... other task_1 logic
    
    
    def get_task_2_log(**kwargs):
        ti = kwargs["ti"]  # Access TaskInstance directly
        task_1_instance = TaskInstance(task=call_task_1, execution_date=ti.execution_date)
        print(f"task_1 status: {task_1_instance.current_state()}")
        # ... other task_2 logic
    
    
    default_args = {
        "start_date": datetime(2024, 2, 9),
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
    }
    
    with DAG(
        "Get_Task_Logs_V0",
        default_args=default_args,
        description="Get_Task_Logs",
        schedule_interval="07 06 * * *",
        start_date=None,
        catchup=False,
    ) as dag:  # Prevent backfilling
    
        with TaskGroup("my_group", tooltip="my_group") as my_group:
            call_task_1 = PythonOperator(
                task_id="call_task_1",
                python_callable=get_task_1_log,
                provide_context=True,  # Pass task instance to callable
                dag=dag,  # Pass DAG object for TaskInstance access
            )
    
        with TaskGroup("my_another_group", tooltip="my_another_group") as my_another_group:
            call_task_2 = PythonOperator(
                task_id="call_task_2",
                python_callable=get_task_2_log,
                provide_context=True,  # Pass task instance to callable
                dag=dag,  # Pass DAG object for TaskInstance access
            )
    
        my_group >> my_another_group