Context:
Dask documentation states clearly that Bag.take()
will only collect from the first partition. However, when using a filter it can occur that the first partition is empty, while others are not.
Question:
Is it possible to use Bag.take()
so that it collects from a sufficient number of partitions to collect the n
items (or the maximum available less than than n
).
You could do something like the following:
from toolz import take
f = lambda seq: list(take(n, seq))
b.reduction(f, f)
This grabs the first n elements of each partition, collects them all together, and then takes the first n elements of the result.