Search code examples

How to get in prefect core local task final state?

I have built a flow which implicitly skips running a given task if a kwarg is empty.

I use something like this within task function for skipping logic:

if kwargs.get('processors', Hierarchy()).__len__() == 0:
                    raise signals.SKIP('skipping task',

I want to build some unit tests to make sure that the final state of said task is skipped. What is the easiest way to get state at a task level?

I can see from docs how to get for a flow but not for a task.


To add to Chris's response, I used his 1st proposed option. As my flow is defined outside of the tests I created a simple function to get a set of tasks that had skipped. In the test this was compared against a list of tasks that should have skipped:

def get_skipped_tasks(flow_state):
        return set( for key, value in flow_state.result.items() if value.is_skipped())


  • There are a few ways that I'll include here for completeness; for my example I'll use this basic flow:

    from prefect import task, Flow
    from prefect.engine.signals import SKIP
    import random
    def random_number():
        return random.randint(0, 100)
    def is_even(num):
        if num % 2:
            raise SKIP("odd number")
        return True
    with Flow("dummy") as flow:
        even_task = is_even(random_number)

    Run the whole flow

    When running interactively you can always run the whole flow and access individual task states from the parent flow run state; note that when you "call" a task (e.g., is_even(random_number)) a copy is created, so you need to track these copies correctly.

    flow_state =
    assert flow_state.result[even_task].is_skipped() # for example

    Run a piece of the flow with mocked data

    When running interactively you can also pass a dictionary of task -> state that the runner will respect; these states can optionally be provided data:

    from prefect.engine.state import Success
    mocked_state = Success(result=2)
    flow_state ={random_number: mocked_state})
    assert not flow_state.result[even_task].is_skipped()

    Use a TaskRunner

    Lastly, if you want to run state-based tests on this task alone you can use a TaskRunner. This gets a little more complicated because you have to recreate the upstream dependencies using Edges.

    from prefect.engine.task_runner import TaskRunner
    from prefect.edge import Edge
    runner = TaskRunner(task=even_task)
    edge = Edge(key="num", upstream_task=random_number, downstream_task=even_task)
    task_state ={edge: mocked_state})
    assert not task_state.is_skipped()