Search code examples
daskdask-distributeddask-dataframe

Get individual dask dataframe partition status


I like to run an asynchronous dask dataframe computation with dd.persist() and then been able to track an individual partition status. The goal is to get access to partial results in a non-blocking way.

Here the desired pseudo code:

dd = dd.persist()
if dd.partitions[0].__dask_status__ == 'finished':
    # Partial non-blocking result access
    df = dd.partitions[0].compute()

Using dask futures works well, but submitting many individual partitions is very slow compared to a single dd.persist() and having one future per partition breaks the dashboard "groups" tab by showing too many blocks.

futures = list(map(client.compute, dd.partitions))

Broken dask dashboard "groups" tab


Solution

  • The function you probably want is distributed.futures_of, which lists the running futures of a collection. You can either examine this list yourself, looking at the status of the futures, or use with distributed.as_completed and a for-loop to process the partitions as they become available. The keys of the futures are like (collection-name, partition-index), so you know which partition each belongs to.

    The reason dd.partitions[i] (or looping over these with list) doesn't work, is that this creates a new graph for each partition, and so you end up submitting much more to the scheduler than the single call to .persist().