I created two task task_1 and task_2 where the sole purpose is to get the task_1 log to another log but I am unable to fetch the task_1 details in task_2 like the task_1.status, task_1.execution_date and more.
# Code:
import os, json, time, airflow, requests
from airflow import DAG
from datetime import datetime, timedelta, timezone
from airflow.configuration import conf
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
from airflow.operators.python_operator import PythonOperator
# These are the functions of the DAG
def get_task_1_log(**kwargs):
task_instance = kwargs['task_instance']
print(task_instance.task_id) # I could get the task_id of the task
print(task_instance.dag_id) # I could get the dag_id of the task
print(task_instance.execution_date) # I could get the execution_date of the task
def get_task_2_log(**kwargs):
task_instance = kwargs['task_instance']
print(task_instance.task_id) # I could get the task_id of the task
print(task_instance.dag_id) # I could get the dag_id of the task
print(task_instance.execution_date) # I could get the execution_date of the task
# initializing the default arguments
default_args = {
'start_date': datetime(2024, 1, 27),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# DAG
with DAG("Get_Task_Logs",
default_args=default_args,
description="Get_Task_Logs",
schedule_interval="07 06 * * *",
start_date=None,
) as dag:
# defining a task group
with TaskGroup("my_group", tooltip="my_group") as my_group:
# using python operator I am calling the function to get the log of the task_1
call_task_1 = PythonOperator(
task_id="call_task_1",
python_callable=get_task_1_log,
trigger_rule='one_success'
)
# using python operator I am calling the function to get the log of the task_2
call_task_2 = PythonOperator(
task_id="call_task_1",
python_callable=get_task_2_log,
trigger_rule='one_success'
)
# calling the task
call_task_1 >> call_task_2
my_group
Using the above code I could get the task logs of the particular instance, but the issue here is that whenever I am fetching the status of the task in the same task it is giving me running. But I want to get the task instance of task_1 in task_2 so that I could get the information and compare like whether in a group the particular task has completed or not, so as to proceed with other task using that condition.
I also want to see whether in a task group all the tasks have been completed or not so that I could compare whether the all the group tasks are completed in one go to proceed with another group rather than comparing it task wise.
The task_1
and task_2
are part of my_group
and my_another_group
so using the TaskInstance(task,execution_date)
. We could get the status of the particular task details like current_state
, execution_date
and more and also it can be called from anywhere may it be another function or same function no problem but note if you call the task current_state within the same task the state will be "running" as the task is still running and not completed or went for retry.
import os
import json
import time
import airflow
import requests
from airflow import DAG
from datetime import datetime, timedelta, timezone
from airflow.configuration import conf
from airflow.models import Variable, TaskInstance
from airflow.utils.task_group import TaskGroup
from airflow.operators.python_operator import PythonOperator
def get_task_1_log(**kwargs):
task_instance = kwargs["task_instance"]
print(f"task_1 execution_date: {task_instance.execution_date}")
print("task_1 state: ", task_instance.current_state())
# ... other task_1 logic
def get_task_2_log(**kwargs):
ti = kwargs["ti"] # Access TaskInstance directly
task_1_instance = TaskInstance(task=call_task_1, execution_date=ti.execution_date)
print(f"task_1 status: {task_1_instance.current_state()}")
# ... other task_2 logic
default_args = {
"start_date": datetime(2024, 2, 9),
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
with DAG(
"Get_Task_Logs_V0",
default_args=default_args,
description="Get_Task_Logs",
schedule_interval="07 06 * * *",
start_date=None,
catchup=False,
) as dag: # Prevent backfilling
with TaskGroup("my_group", tooltip="my_group") as my_group:
call_task_1 = PythonOperator(
task_id="call_task_1",
python_callable=get_task_1_log,
provide_context=True, # Pass task instance to callable
dag=dag, # Pass DAG object for TaskInstance access
)
with TaskGroup("my_another_group", tooltip="my_another_group") as my_another_group:
call_task_2 = PythonOperator(
task_id="call_task_2",
python_callable=get_task_2_log,
provide_context=True, # Pass task instance to callable
dag=dag, # Pass DAG object for TaskInstance access
)
my_group >> my_another_group