Search code examples
pythondask

calculating many dask delayed objects together; message as soon as each is finished?


I have a number of dask delayed objects that I am computing together, resulting in files being written to disk, such as in this toy example:

import xarray as xr
import dask.array as da

objs = []
for i in range(10):
    ds = xr.Dataset({"x": (("a",), da.arange(10)*i)})
    objs.append(ds.to_netcdf(f"/tmp/test{i:d}.nc", compute=False))
da.compute(objs)

Is there a way to execute some arbitrary code for each individual delayed object, as soon as computation has completed successfully?

Motivation: in our real case, we are producing in the order of 100 files that may take between 10 seconds and 8 minutes to produce. We would like to ship each file to the user as soon as possible. Alternatives we considered:

  • We could use inotify to monitor the directory for IN_CLOSE_WRITE events. This is not ideal, because it can't tell if writing was successful or not, so we would need another step to check if the file appears correct.
  • We could wait until da.compute is complete and then ship all files. This is our current approach, but it is not ideal, because then we are waiting for the 8-minute-product to complete before shipping all products at once, leading to a needless wait and a jam in the network and the client application.
  • Ideally, dask would send a message or execute a piece of code for each completed task, so that we can ship each file to the user as soon as it is completed.

Is there a way to achieve this?

Maybe we could wrap everything in a dask.delayed function that has the to_netcdf call and the callback? But considering my poor understanding of dask, I'm not sure what repercussions that might have.


Solution

  • The simple way to do this is to have a second delayed function which depends on the output of the to_netcdf objects

    @dask.delayed
    def on_done(result):
        notify(from(result)
    
    objs2 = [on_done(_) for _ in objs]
    dask.compute(objs2)
    

    The result will be whatever type to_netcdf returns - hopefully the filename created.