Search code examples
pythonparallel-processingdaskdask-distributeddask-delayed

How to parallelize a loop with Dask?


I find the Dask documentation quite confusing. Let's say I have a function:

import random
import dask

def my_function(arg1, arg2, arg3):
    val = random.uniform(arg1, arg2) 
    va2 = random.uniform(arg2, arg3)
    return val1 + val2

some_list = []
for i in range(100):
    some_num = dask.delayed(my_function)(arg1, arg2, arg3)
    some_list += [some_num]

computed_list = dask.compute(*some_list)

This thing is going to fail, due to my_function() not getting all 3 arguments.

How can I parallelize this snippet of code in dask?


EDIT:

Seems to work if you put a @dask.delayed decorator on top of the function def and call it normally, but now the .compute()-method line throws:

KilledWorker: ('my_function-ac3c88f1-53f8-4d36-a520-ff8c40c6ee61', <Worker 'tcp://127.0.0.1:35925', name: 1, memory: 0, processing: 10>)

Solution

  • I build a graph first and then call compute on it:

    import random
    import dask
    
    @dask.delayed
    def my_function(arg1, arg2, arg3):
        val1 = random.uniform(arg1, arg2) 
        val2 = random.uniform(arg2, arg3)
        return val1 + val2
    
    arg1 = 1
    arg2 = 2
    arg3 = 3
    
    some_list = []
    for i in range(10):
        some_num = my_function(arg1, arg2, arg3)
        some_list.append(some_num)
    
    graph = dask.delayed()(some_list)
    # graph.visualize()
    computed_list = graph.compute()