Search code examples
pythondistributeddask

Creating distributed dask arrays


I'm interested in making a distributed dask array out of a bunch of netcdf files I have lying around. I started down the path outlined in "Distributed Dask arrays" but have gotten a little caught up by the deprecation of 'distributed.collections'

What is the best way to create a distributed dask array now? I have my dask-scheduler and dask-worker tasks running. And I can successfully execute the following:

from distributed import Client, progress
client = Client('scheduler-address:8786')
futures = client.map(load_netcdf, filenames)
progress(futures)

What next?


Solution

  • Use XArray

    First, if you have many NetCDF files, then you should take a long look at the XArray package, which wraps Dask.array and manages all of the NetCDF metadata conventions.

    http://xarray.pydata.org/en/stable/

    In particular I think that you want the open_mfdataset function.

    By Hand

    If you want to build a dask.array by hand using techniques in that blogpost then you should use the dask.delayed interface and the da.from_delayed function.

    http://dask.pydata.org/en/latest/array-creation.html#using-dask-delayed

    If you want to use Futures as in that blogpost that's fine, da.from_delayed will accept Futures in place of delayed values.

    array_chunks = [da.from_delayed(future, shape=..., dtype=...) 
                    for future in futures]
    array = da.concatenate(array_chunks, axis=0)