Search code examples
python-3.xdaskdask-distributeddask-delayedfastparquet

Dask - How to cancel and resubmit stalled tasks?


Frequently, I encounter an issue where Dask randomly stalls on a couple tasks, usually tied to a read of data from a different node on my network (more details about this below). This can happen after several hours of running the script with no issues. It will hang indefinitely in a form shown below (this loop otherwise takes a few seconds to complete):

enter image description here

In this case, I see that there just a handful of stalled processes, and all are on one particular node (192.168.0.228): enter image description here

Each worker on this node is stalled on a couple read_parquet tasks:

enter image description here

This was called using the following code and is using fastparquet:

ddf = dd.read_parquet(file_path, columns=['col1', 'col2'], index=False, gather_statistics=False)

My cluster is running Ubuntu 19.04 and all the latest versions (as of 11/12) of Dask and Distributed and the required packages (e.g., tornado, fsspec, fastparquet, etc.)

The data that the .228 node is trying to access is located on another node in my cluster. The .228 node accesses the data through CIFS file sharing. I run the Dask scheduler on the same node on which I'm running the script (different from both the .228 node and the data storage node). The script connects the workers to the scheduler via SSH using Paramiko:

ssh_client = paramiko.SSHClient()
stdin, stdout, stderr = ssh_client.exec_command('sudo dask-worker ' +
                                                            ' --name ' + comp_name_decode +
                                                            ' --nprocs ' + str(nproc_int) +
                                                            ' --nthreads 10 ' +
                                                            self.dask_scheduler_ip, get_pty=True)  

The connectivity of the .228 node to the scheduler and to the data storing node all look healthy. It is possible that the .228 node experienced some sort of brief connectivity issue while trying to process the read_parquet task, but if that occurred, the connectivity of .228 node to the scheduler and the CIFS shares were not impacted beyond that brief moment. In any case, the logs do not show any issues. This is the whole log from the .228 node:

distributed.worker - INFO - Start worker at: tcp://192.168.0.228:42445

distributed.worker - INFO - Listening to: tcp://192.168.0.228:42445

distributed.worker - INFO - dashboard at: 192.168.0.228:37751

distributed.worker - INFO - Waiting to connect to: tcp://192.168.0.167:8786

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO - Threads: 2

distributed.worker - INFO - Memory: 14.53 GB

distributed.worker - INFO - Local Directory: /home/dan/worker-50_838ig

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO - Registered to: tcp://192.168.0.167:8786

distributed.worker - INFO - -------------------------------------------------

Putting aside whether this is a bug in Dask or in my code/network, is it possible to set a general timeout for all tasks handled by the scheduler? Alternatively, is it possible to:

  1. identify stalled tasks,
  2. copy a stalled task and move it to another worker, and
  3. cancel the stalled task?

Solution

  • is it possible to set a general timeout for all tasks handled by the scheduler?

    As of 2019-11-13 unfortunately the answer is no.

    If a task has properly failed then you can retry that task with client.retry(...) but there is no automatic way to have a task fail itself after a certain time. This is something that you would have to write into your Python functions yourself. Unfortunately it is hard to interrupt a Python function in another thread, which is partially why this is not implemented.

    If the worker goes down then things will be tried elsewhere. However from what you say it sounds like everything is healthy, it's just that the tasks themselves are likely to take forever. It's hard to identify this as a failure case unfortunately.