I am in an HPC environment with clusters, tightly coupled interconnects, and backing Lustre filesystems. We have been exploring how to leverage Dask to not only provide computation, but also to act as a distributed cache to speed up our workflows. Our proprietary data format is n-dimensional and regular, and we have coded a lazy reader to pass into the from_array/from_delayed methods.
We have had some issues with loading and persisting larger-than-memory datasets across a Dask cluster.
Example with hdf5:
# Dask scheduler has been started and connected to 8 workers
# spread out on 8 machines, each with --memory-limit=150e9.
# File locking for reading hdf5 is also turned off
from dask.distributed import Client
c = Client({ip_of_scheduler})
import dask.array as da
import h5py
hf = h5py.File('path_to_600GB_hdf5_file', 'r')
ds = hf[hf.keys()[0]]
x = da.from_array(ds, chunks=(100, -1, -1))
x = c.persist(x) # takes 40 minutes, far below network and filesystem capabilities
print x[300000,:,:].compute() # works as expected
We have also loaded datasets (using slicing, dask.delayed, and from_delayed) from some of our own file file formats, and have seen similar degradation of performance as the file size increases.
My questions: Are there inherent bottlenecks to using Dask as a distributed cache? Will all data be forced to funnel through the scheduler? Are the workers able to take advantage of Lustre, or are functions and/or I/O serialized somehow? If this is the case, would it be more effective to not call persist on massive datasets and just let Dask handle the data and computation when it needs to?
Are there inherent bottlenecks to using Dask as a distributed cache?
There are bottlenecks to every system, but it sounds like you're not close to running into the bottlenecks that I would expect from Dask. I suspect that you're running into something else.
Will all data be forced to funnel through the scheduler?
No, workers can execute functions that load data on their own. That data will then stay on the workers.
Are the workers able to take advantage of Lustre, or are functions and/or I/O serialized somehow?
Workers are just Python processes, so if Python processes running on your cluster can take advantage of Lustre (this is almost certainly the case) then yes, Dask Workers can take advantage of Lustre.
If this is the case, would it be more effective to not call persist on massive datasets and just let Dask handle the data and computation when it needs to?
This is certainly common. The tradeoff here is between distributed bandwidth to your NFS and the availability of distributed memory.
In your position I would use Dask's diagnostics to figure out what was taking up so much time. You might want to read through the documentation on understanding performance and the section on the dashboard in particular. That section has a video that might be particularly helpful. I would ask two questions: