I like to run an asynchronous dask dataframe computation with dd.persist() and then been able to track an individual partition status. The goal is to get access to partial results in a non-blocking way.
Here the desired pseudo code:
dd = dd.persist()
if dd.partitions[0].__dask_status__ == 'finished':
# Partial non-blocking result access
df = dd.partitions[0].compute()
Using dask futures works well, but submitting many individual partitions is very slow compared to a single dd.persist() and having one future per partition breaks the dashboard "groups" tab by showing too many blocks.
futures = list(map(client.compute, dd.partitions))
The function you probably want is distributed.futures_of
, which lists the running futures of a collection. You can either examine this list yourself, looking at the status of the futures, or use with distributed.as_completed
and a for-loop to process the partitions as they become available. The keys of the futures are like (collection-name, partition-index)
, so you know which partition each belongs to.
The reason dd.partitions[i]
(or looping over these with list
) doesn't work, is that this creates a new graph for each partition, and so you end up submitting much more to the scheduler than the single call to .persist()
.