Search code examples
daskconcurrent.futuresdask-distributed

Iterate sequentially over a dask bag


I need to submit the elements of a very large dask.bag to a non-threadsafe store, ie I need something like

for x in dbag:
    store.add(x)

I can not use compute since the bag is to large to fit in memory. I need something more like distributed.as_completed but that works on bags, which distributed.as_completed does not.


Solution

  • I would probably continue to use normal compute, but add a lock

    def commit(x, lock=None):
        with lock:
            store.add(x)
    
    b.map(commit, lock=my_lock)
    

    Where you might create a threading.Lock, or multiprocessing.Lock depending on the kind of processing you're doing

    If you want to use as_completed you can convert your bag to futures and use as_completed on them.

    from distributed.client import futures_of, as_completed
    b = b.persist()
    futures = futures_of(b)
    
    for future in as_completed(futures):
        for x in future.result():
            store.add(x)
    

    You can also convert to a dataframe, which I believe does iterate more sensibly

    df = b.to_dataframe(...)
    for x in df.iteritems(...):
        ...