Search code examples
pythondistributeddask

How to get information about a particular Dask task


I'm running into a problem whereby my distributed cluster appears to "hang" - e.g. tasks stop processing and hence a backlog of unprocessed tasks builds up so I'm looking for some way to help debug what's going on.

On the Client there's the processing method which will tell me what tasks are currently running on each worker but AFAICS that's the only info about the tasks available on the Client object?

What I'd like to to is to be able to query not just processing tasks, but all tasks including processed, processing and errored and for each task to be able to get some statistics such as submitted_time and completion_time which would allow me to find out what tasks are blocking the cluster.

This would be similar to the extended metadata on the ipyparallel.AsyncResult

A nice to have would be to to be able to get the args/kwargs for any give task too. This would be especially helpful in debugging failed tasks.

Is any of this functionality available currently or is there any way to get the info I'm after?

Any other suggestions on how to debug the problem would be greatly welcomed.


Solution

  • As of May 2017 no explicit "give me all of the information about a task" operation exists. However, you can use the client to investigate task state directly. This will require you to dive a bit into the information that the scheduler and worker track. See the following doc pages:

    To query this state I would use the Client.run_on_scheduler and Client.run methods. These take a function to run on the scheduler or workers respsectively. If this function includes a dask_scheduler or dask_worker argument then the function will be given the scheduler or worker object itself.

    def f(dask_scheduler):
        return dask_scheduler.task_state
    
    client.run_on_scheduler(f)
    

    You now have access to check any state that the scheduler or workers know about and to run any internal diagnostic checks. What you choose to investigate though depends entirely on your use case.

    def f(keys, dask_scheduler=None):
        return dask_scheduler.transition_story(*keys)
    
    client.run_on_scheduler(f, [key1, key2, key3])