Search code examples
pythonparallel-processingazure-cosmosdbprefect

using iterator in prefect task "batching"


I am using prefect and defining a flow to insert documents with cosmos db.

The problem is the query_items() call is an iterable, and for large containers, there would be no way to hold all entries in memory.

I believe my problem could be reduced to:

  • given an iterator, how can I create batches to be processed (mapped) in a prefect flow?

Example:

def big_iterable_function_i_cannot_change():
    yield from range(1000000) # some large amount of work

@task
def some_prefect_batching_magic(x):
    # magic code here
    pass


with Flow("needs-to-be-batched"):
    some_prefect_batching_magic.map(big_iterable_function_i_cannot_change())

The above code, or something like it will give me an error:

prefect.FlowRunner | Flow run FAILED: some reference tasks failed.

Solution

  • You are getting this error because you are not defining big_iterable_function_i_cannot_change as a task. prefect does not actually execute a flow directly. The flow is used to make a schedule, (in dask parlance) -- Which is then used to execute the flow, (as far as I understand). Parallelization in prefect only happens when it is used with the dask executor.

    Here is my take on your flow. But if you cannot add the task decorator to big_iterable_function_i_cannot_change into a task, wrap it in a task. Finally - Not sure you can pass a generator to a mapped task.

    import prefect
    from prefect import Flow, Parameter, task
    
    @task
    def big_iterable_function_i_cannot_change():
        return range(5) # some large amount of work
    
    @task
    def some_prefect_batching_magic(x):
        # magic code here
        pass
    
    
    with Flow("needs-to-be-batched") as flow:
        itter_res = big_iterable_function_i_cannot_change()
        post_process_res = some_prefect_batching_magic.map(itter_res)
    
    flow.visualize()
    state = flow.run()
    
    
    flow.visualize(flow_state=state)