Search code examples
daskdask-distributeddask-delayed

How do I run a group of nodes together with Dask


I have an image processing graph and I want to process many images in batch. My graph looks like the following: My graph made from delayed functions

When I run the graph bokeh shows the execution path like this: How I observe it to run in bokeh

This causes my machines to run out of memory and crash as the output of the load image is megabytes of images. I would like the graph to run like this as as the result of Save result is very small and should be fine: How I want it to run in bokeh

How can I do this with dask?

Customizing Optimization seems like it would be useful where I could possibly fuse the middle nodes. Is this the best way?


Solution

  • Dask prefers to execute tasks where memory can be freed, which ought to be depth-wise in your example. However, it also gives you parallelism; so the easiest way may be just to have one worker.

    Indeed, the linear chains in the graph would make a good case for fuse. You can call optimise yourself (dask.optimization.inline_functions, dask.optimization.fuse, shouldn't need custom), or you could write a function which explicitly calls each sub-task in turn within a single task (save(process(load(..)))).