Search code examples
pipelinedaskdask-distributed

Dask: build and execute efficient pipeline


I am using dask to apply several transformations on thousand of images. For each image, there are 5 transformations that need to be done sequentially. I would like to distribute this pipeline on a HPC cluster. I have 200 available CPU so I would like to be able to perform e.g.

input_files = [a list of 2000 files]
futures = client.map(transforms, input_files)

Dask distributed would ideally run 200 transformations at a time. Write the desired output as soon a a processing is finished etc.

However it seems that it does not work exactly this way. I observe that dask tends to start 200 tasks but only the first 3 steps, then e.g. 50 tasks or equivalent random processing.

What would be the good way of using dask distributed? Should I for example execute client.map 10 times (200 tasks each time) ? Is there a way to "force" dask to execute a code from start to end before launching a new dask ?


Solution

  • What would be the good way of using dask distributed? Should I for example execute client.map 10 times (200 tasks each time)? Is there a way to "force" dask to execute a code from start to end before launching a new dask ?

    One option is to designate higher priority for earlier tasks. client.map assigns a single priority value, so to specify specific priority for each task use client.submit:

    futures = []
    for n,f in enumerate(input_files):
        futures.append(client.submit, transforms, f, priority=-n)
    

    In this case, later tasks will have lower priority, so should be completed after tasks with higher priority are completed. Since you have multiple steps of transformations you will also want to assign later transformation functions a higher priority value.