I have built a flow which implicitly skips running a given task if a kwarg is empty.
I use something like this within task function for skipping logic:
if kwargs.get('processors', Hierarchy()).__len__() == 0:
raise signals.SKIP('skipping task',
result=Prediction())
I want to build some unit tests to make sure that the final state of said task is skipped. What is the easiest way to get state at a task level?
I can see from docs how to get for a flow but not for a task.
Update
To add to Chris's response, I used his 1st proposed option. As my flow is defined outside of the tests I created a simple function to get a set of tasks that had skipped. In the test this was compared against a list of tasks that should have skipped:
def get_skipped_tasks(flow_state):
return set(key.name for key, value in flow_state.result.items() if value.is_skipped())
There are a few ways that I'll include here for completeness; for my example I'll use this basic flow:
from prefect import task, Flow
from prefect.engine.signals import SKIP
import random
@task
def random_number():
return random.randint(0, 100)
@task
def is_even(num):
if num % 2:
raise SKIP("odd number")
return True
with Flow("dummy") as flow:
even_task = is_even(random_number)
When running interactively you can always run the whole flow and access individual task states from the parent flow run state; note that when you "call" a task (e.g., is_even(random_number)
) a copy is created, so you need to track these copies correctly.
flow_state = flow.run()
assert flow_state.result[even_task].is_skipped() # for example
When running interactively you can also pass a dictionary of task -> state that the runner will respect; these states can optionally be provided data:
from prefect.engine.state import Success
mocked_state = Success(result=2)
flow_state = flow.run(task_states={random_number: mocked_state})
assert not flow_state.result[even_task].is_skipped()
Lastly, if you want to run state-based tests on this task alone you can use a TaskRunner
. This gets a little more complicated because you have to recreate the upstream dependencies using Edge
s.
from prefect.engine.task_runner import TaskRunner
from prefect.edge import Edge
runner = TaskRunner(task=even_task)
edge = Edge(key="num", upstream_task=random_number, downstream_task=even_task)
task_state = runner.run(upstream_states={edge: mocked_state})
assert not task_state.is_skipped()