Search code examples

using iterator in prefect task "batching"

I am using prefect and defining a flow to insert documents with cosmos db.

The problem is the query_items() call is an iterable, and for large containers, there would be no way to hold all entries in memory.

I believe my problem could be reduced to:

  • given an iterator, how can I create batches to be processed (mapped) in a prefect flow?


def big_iterable_function_i_cannot_change():
    yield from range(1000000) # some large amount of work

def some_prefect_batching_magic(x):
    # magic code here

with Flow("needs-to-be-batched"):

The above code, or something like it will give me an error:

prefect.FlowRunner | Flow run FAILED: some reference tasks failed.


  • You are getting this error because you are not defining big_iterable_function_i_cannot_change as a task. prefect does not actually execute a flow directly. The flow is used to make a schedule, (in dask parlance) -- Which is then used to execute the flow, (as far as I understand). Parallelization in prefect only happens when it is used with the dask executor.

    Here is my take on your flow. But if you cannot add the task decorator to big_iterable_function_i_cannot_change into a task, wrap it in a task. Finally - Not sure you can pass a generator to a mapped task.

    import prefect
    from prefect import Flow, Parameter, task
    def big_iterable_function_i_cannot_change():
        return range(5) # some large amount of work
    def some_prefect_batching_magic(x):
        # magic code here
    with Flow("needs-to-be-batched") as flow:
        itter_res = big_iterable_function_i_cannot_change()
        post_process_res =
    state =