Using task flow, let's say I have:
from airflow.utils.state import State
from airflow.operators.python import get_current_context
@dag(
schedule_interval=None,
start_date=datetime(2021, 1, 1)
)
def myDag():
@task()
def getData():
try:
result = something_that_might_fail()
return result
except HorribleException as error:
context = get_current_context()
context['task_instance'] = State.FAILED # Doesn't work
return {"error": str(error)}
@task()
def transform(data_dict: dict):
for data in data_dict:
print(data)
...
transform(getData())
run = myDag()
For monitoring purposes I want to mark the getData task failed but I still want to pass a result to transform. I tried to use the context to get access to the task instance state but it doesn't seem to work. It seems there should be a better approach, but I don't see it.
You can change the status of the task, but not part of the task itself but in the other task. I guess that once you exit the "getdata" successfully then its state eventually is "success".
You can send info to "transform" task to change "getdata" and do it as follow :
@task()
def transform(data_dict: dict):
context = get_current_context()
dag_run: DagRun = context['dag_run']
dag_run.dag.set_task_instance_state(
task_id="getdata", state=TaskInstanceState.FAILED, run_id=dag_run.run_id,
)