I need to submit the elements of a very large dask.bag
to a non-threadsafe store, ie I need something like
for x in dbag:
store.add(x)
I can not use compute
since the bag is to large to fit in memory.
I need something more like distributed.as_completed
but that works on bags, which distributed.as_completed
does not.
I would probably continue to use normal compute, but add a lock
def commit(x, lock=None):
with lock:
store.add(x)
b.map(commit, lock=my_lock)
Where you might create a threading.Lock
, or multiprocessing.Lock
depending on the kind of processing you're doing
If you want to use as_completed you can convert your bag to futures and use as_completed on them.
from distributed.client import futures_of, as_completed
b = b.persist()
futures = futures_of(b)
for future in as_completed(futures):
for x in future.result():
store.add(x)
You can also convert to a dataframe, which I believe does iterate more sensibly
df = b.to_dataframe(...)
for x in df.iteritems(...):
...