Search code examples
daskdask-distributeddask-delayed

Save larger than memory Dask array to hdf5 file


I need to save dask arrays to hdf5 when using dask distributed. My situation is very similar to the one described in this issue:https://github.com/dask/dask/issues/3351. Basically this code will work:

import dask.array as da
from distributed import Client
import h5py
from dask.utils import SerializableLock


def create_and_store_dask_array():
    data = da.random.normal(10, 0.1, size=(1000, 1000), chunks=(100, 100))

    data.to_hdf5('test.h5', '/test')
    # this fails too
    # f = h5py.File('test.h5', 'w')
    # dset = f.create_dataset('/matrix', shape=data.shape)
    # da.store(data, dset) #
    # f.close()



create_and_store_dask_array()

But as soon as I try and involve the distributed scheduler I get an TypeError: can't pickle _thread._local objects.

import dask.array as da
from distributed import Client
import h5py
from dask.utils import SerializableLock
from dask.distributed import Client, LocalCluster,progress,performance_report


def create_and_store_dask_array():
    data = da.random.normal(10, 0.1, size=(1000, 1000), chunks=(100, 100))

    data.to_hdf5('test.h5', '/test')
    # this fails too
    # f = h5py.File('test.h5', 'w')
    # dset = f.create_dataset('/matrix', shape=data.shape)
    # da.store(data, dset) #
    # f.close()


cluster = LocalCluster(n_workers=35,threads_per_worker=1)
client =Client(cluster)
create_and_store_dask_array()

I am currently working around this by submitting my computations to the scheduler in small pieces, gathering the results in memory and saving the arrays with h5py, but this is very, very slow. Can anyone suggest a good work around to this problem? The issue discussion implies that xarray can take an dask array and write that to and hdf5 file, although this seems very slow.

import xarray as xr
import netCDF4
import dask.array as da
from distributed import Client
import h5py
from dask.utils import SerializableLock
cluster = LocalCluster(n_workers=35,threads_per_worker=1)
client =Client(cluster)
data = da.random.normal(10, 0.1, size=(1000, 1000), chunks=(100, 100))
#data.to_hdf5('test.h5', '/test')
test = xr.DataArray(data,dims=None,coords=None)
#save as hdf5
test.to_netcdf("test.h5",mode='w',format="NETCDF4")

If any one could suggest a way to deal with this I am very interested in finding a solution (particularly one that does not involve adding additional dependencies)

Thanks in advance,


Solution

  • H5Py objects are not serializable, and so are hard to move between different processes in a distributed context. The explicit to_hdf5 method works around this. The more general store method doesn't special-case HDF5 in the same way.