Search code examples
pythonprefect

How to get in prefect core local task final state?


I have built a flow which implicitly skips running a given task if a kwarg is empty.

I use something like this within task function for skipping logic:

if kwargs.get('processors', Hierarchy()).__len__() == 0:
                    raise signals.SKIP('skipping task',
                                       result=Prediction())    

I want to build some unit tests to make sure that the final state of said task is skipped. What is the easiest way to get state at a task level?

I can see from docs how to get for a flow but not for a task.

Update

To add to Chris's response, I used his 1st proposed option. As my flow is defined outside of the tests I created a simple function to get a set of tasks that had skipped. In the test this was compared against a list of tasks that should have skipped:

def get_skipped_tasks(flow_state):
        return set(key.name for key, value in flow_state.result.items() if value.is_skipped())

Solution

  • There are a few ways that I'll include here for completeness; for my example I'll use this basic flow:

    from prefect import task, Flow
    from prefect.engine.signals import SKIP
    import random
    
    
    @task
    def random_number():
        return random.randint(0, 100)
    
    @task
    def is_even(num):
        if num % 2:
            raise SKIP("odd number")
        return True
    
    with Flow("dummy") as flow:
        even_task = is_even(random_number)
    

    Run the whole flow

    When running interactively you can always run the whole flow and access individual task states from the parent flow run state; note that when you "call" a task (e.g., is_even(random_number)) a copy is created, so you need to track these copies correctly.

    flow_state = flow.run()
    
    assert flow_state.result[even_task].is_skipped() # for example
    

    Run a piece of the flow with mocked data

    When running interactively you can also pass a dictionary of task -> state that the runner will respect; these states can optionally be provided data:

    from prefect.engine.state import Success
    
    mocked_state = Success(result=2)
    
    
    flow_state = flow.run(task_states={random_number: mocked_state})
    assert not flow_state.result[even_task].is_skipped()
    

    Use a TaskRunner

    Lastly, if you want to run state-based tests on this task alone you can use a TaskRunner. This gets a little more complicated because you have to recreate the upstream dependencies using Edges.

    from prefect.engine.task_runner import TaskRunner
    from prefect.edge import Edge
    
    runner = TaskRunner(task=even_task)
    edge = Edge(key="num", upstream_task=random_number, downstream_task=even_task)
    
    
    task_state = runner.run(upstream_states={edge: mocked_state})
    assert not task_state.is_skipped()