Search code examples
python-3.xstateairflow-2.xraise

Airflow 2.3 - Changing a task instance state as failed without raising


Using task flow, let's say I have:

from airflow.utils.state import State
from airflow.operators.python import get_current_context

@dag(
    schedule_interval=None,
    start_date=datetime(2021, 1, 1)
)
def myDag():
    @task()
    def getData():
        try:
            result = something_that_might_fail()
            return result
        except HorribleException as error:
            context = get_current_context()
            context['task_instance'] = State.FAILED # Doesn't work
            return {"error": str(error)}

    @task()
    def transform(data_dict: dict):
        for data in data_dict:
            print(data)
        ...


    transform(getData())

run = myDag()

For monitoring purposes I want to mark the getData task failed but I still want to pass a result to transform. I tried to use the context to get access to the task instance state but it doesn't seem to work. It seems there should be a better approach, but I don't see it.


Solution

  • You can change the status of the task, but not part of the task itself but in the other task. I guess that once you exit the "getdata" successfully then its state eventually is "success".

    You can send info to "transform" task to change "getdata" and do it as follow :

    @task()
    def transform(data_dict: dict):
        context = get_current_context()
        dag_run: DagRun = context['dag_run']
        dag_run.dag.set_task_instance_state(
            task_id="getdata", state=TaskInstanceState.FAILED, run_id=dag_run.run_id,
        ) 
    

    enter image description here