Search code examples
pythonpython-3.xprefect

How to handle a task failure in prefect and return SUCCESS with the on_failure param?


I have a Flow in prefect whose with a task whose output is a dataframe. In the example provided below it always fails. I would like the task to return an empty dataframe with the state of SUCCESS using @task(on_failure=handle_task_fail). What is the correct syntax to achieve this?

from pprint import pprint
import pandas as pd

from prefect import Flow, task
from prefect.engine.signals import SUCCESS


def handle_disambig_error(task, old_state, new_state):
    if new_state.is_failed():
        new_state.result["wiki_df"] = pd.DataFrame()

        # Is this needed?
        #set state to SUCCESS
    return new_state


@task(on_failure=handle_disambig_error)
def get_wiki_resource():

    wiki_df = pd.DataFrame(
        {
            "a":[1],
            "b":[1/0]
        }
    )

    return wiki_df

with Flow("Always Fail") as flow:
    wiki_df = get_wiki_resource()

state = flow.run()
task_state = state.result[wiki_df]
pprint(task_state.result)

Traceback:

Traceback (most recent call last):
  File "/miniconda3/lib/python3.7/site-packages/prefect/engine/runner.py", line 161, in handle_state_change
    new_state = self.call_runner_target_handlers(old_state, new_state)
  File "/miniconda3/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 120, in call_runner_target_handlers
    new_state = handler(self.task, old_state, new_state) or new_state
  File "/miniconda3/lib/python3.7/site-packages/prefect/utilities/notifications.py", line 69, in state_handler
    fn(obj, new_state)
TypeError: handle_disambig_error() missing 1 required positional argument: 'new_state'
[2020-01-28 17:39:41,759] INFO - prefect.TaskRunner | Task 'get_wiki_resource': finished task run for task with final state: 'Failed'
[2020-01-28 17:39:41,762] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.

Some places I searched State Handlers, Logging with a State Handler


Solution

  • There are two things going on here:

    1.) Generic State Handlers: these can be set via the state_handlers kwarg and will be called on every state change. A state handler is required to have a signature state_handler(task: Task, old_state: State, new_state: State) -> Optional[State] (which is the signature you are using); the Task's state after calling this handler will be the state which is returned from the handler, or the new_state if None is returned.

    2.) On Failure callbacks: the on_failure kwarg that you are using here is intended to be a convenience API for state handlers; functions which are passed to this keyword are required to have signature fn(task: Task, state: State) -> None and will only be called whenever this Task enters a Failed state. Note that on failure callbacks cannot alter a Task's state in the way that state handlers can.

    In your example, you appear to be mixing the two keyword arguments. I believe the following code will do what you're expecting:

    from prefect.engine.state import Success
    
    
    def handle_disambig_error(task, old_state, new_state):
        if new_state.is_failed():
            return_state = Success(result=pd.DataFrame())
        else:
            return_state = new_state
        return return_state
    
    @task(state_handlers=[handle_disambig_error])
    def get_wiki_resource():
       return df