Search code examples
pythondaskdask-delayed

Creating a dask bag from a generator


I would like to create a dask.Bag (or dask.Array) from a list of generators. The gotcha is that the generators (when evaluated) are too large for memory.

delayed_array = [delayed(generator) for generator in list_of_generators]
my_bag = db.from_delayed(delayed_array)

NB list_of_generators is exactly that - the generators haven't been consumed (yet).

My problem is that when creating delayed_array the generators are consumed and RAM is exhausted. Is there a way to get these long lists into the Bag without first consuming them, or at least consuming them in chunks so RAM use is kept low?

NNB I could write the generators to disk, and then load the files into the Bag - but I thought I might be able to use dask to get around this?


Solution

  • A decent subset of Dask.bag can work with large iterators. Your solution is almost perfect, but you'll need to provide a function that creates your generators when called rather than the generators themselves.

    In [1]: import dask.bag as db
    
    In [2]: import dask
    
    In [3]: b = db.from_delayed([dask.delayed(range)(i) for i in [100000000] * 5])
    
    In [4]: b
    Out[4]: dask.bag<bag-fro..., npartitions=5>
    
    In [5]: b.take(5)
    Out[5]: (0, 1, 2, 3, 4)
    
    In [6]: b.sum()
    Out[6]: <dask.bag.core.Item at 0x7f852d8737b8>
    
    In [7]: b.sum().compute()
    Out[7]: 24999999750000000
    

    However, there are certainly ways that this can bite you. Some slightly more complex dask bag operations do need to make partitions concrete, which could blow out RAM.