I have a number of dask delayed objects that I am computing together, resulting in files being written to disk, such as in this toy example:
import xarray as xr
import dask.array as da
objs = []
for i in range(10):
ds = xr.Dataset({"x": (("a",), da.arange(10)*i)})
objs.append(ds.to_netcdf(f"/tmp/test{i:d}.nc", compute=False))
da.compute(objs)
Is there a way to execute some arbitrary code for each individual delayed object, as soon as computation has completed successfully?
Motivation: in our real case, we are producing in the order of 100 files that may take between 10 seconds and 8 minutes to produce. We would like to ship each file to the user as soon as possible. Alternatives we considered:
IN_CLOSE_WRITE
events. This is not ideal, because it can't tell if writing was successful or not, so we would need another step to check if the file appears correct.da.compute
is complete and then ship all files. This is our current approach, but it is not ideal, because then we are waiting for the 8-minute-product to complete before shipping all products at once, leading to a needless wait and a jam in the network and the client application.Is there a way to achieve this?
Maybe we could wrap everything in a dask.delayed
function that has the to_netcdf
call and the callback? But considering my poor understanding of dask, I'm not sure what repercussions that might have.
The simple way to do this is to have a second delayed function which depends on the output of the to_netcdf objects
@dask.delayed
def on_done(result):
notify(from(result)
objs2 = [on_done(_) for _ in objs]
dask.compute(objs2)
The result
will be whatever type to_netcdf
returns - hopefully the filename created.