I am using prefect and defining a flow to insert documents with cosmos db.
The problem is the query_items() call is an iterable, and for large containers, there would be no way to hold all entries in memory.
I believe my problem could be reduced to:
given an iterator, how can I create batches to be processed (mapped) in a prefect flow?
Example:
def big_iterable_function_i_cannot_change():
yield from range(1000000) # some large amount of work
@task
def some_prefect_batching_magic(x):
# magic code here
pass
with Flow("needs-to-be-batched"):
some_prefect_batching_magic.map(big_iterable_function_i_cannot_change())
The above code, or something like it will give me an error:
prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
You are getting this error because you are not defining big_iterable_function_i_cannot_change
as a task
. prefect
does not actually execute a flow
directly. The flow
is used to make a schedule
, (in dask
parlance) -- Which is then used to execute the flow, (as far as I understand). Parallelization in prefect
only happens when it is used with the dask executor.
Here is my take on your flow
. But if you cannot add the task decorator to big_iterable_function_i_cannot_change
into a task
, wrap it in a task. Finally - Not sure you can pass a generator to a mapped task.
import prefect
from prefect import Flow, Parameter, task
@task
def big_iterable_function_i_cannot_change():
return range(5) # some large amount of work
@task
def some_prefect_batching_magic(x):
# magic code here
pass
with Flow("needs-to-be-batched") as flow:
itter_res = big_iterable_function_i_cannot_change()
post_process_res = some_prefect_batching_magic.map(itter_res)
flow.visualize()
state = flow.run()
flow.visualize(flow_state=state)