Search code examples
daskdask-distributed

Dask does not use all workers and behaves differently with different number of nodes


I have an embarrassingly parallel scenario, every item (a molecule) is processed by a function and returned values are stored in DB. Each task takes 2-20 minutes, therefore after searching I increased timeouts and changed other parameters, but it still does not work as expected. Behavior depends on the number of used nodes. Calculations were finished if I use 2 nodes (with some issues), but with 10 nodes they were interrupted.

My gut feeling that the issue is in config of dask or config of a physical cluster. Maybe the issue is in the way how I use it? I will appreciate any suggestions.

Experiments:

I run dask on own physical PBS cluster by invoking SSH dask cluster (as a more general solution). Every node has 32 cores.

dask ssh --hostfile $PBS_NODEFILE --nworkers 32 --nthreads 1 &

$PBS_NODEFILE is a file with a list of IP addresses of computing nodes

Then I call a python script in which

python run.py

import dask
from dask.distributed import Client
dask.config.set({'distributed.scheduler.allowed-failures': 30})
dask.config.set({'distributed.scheduler.work-stealing-interval': '1minutes'}) 
dask.config.set({'distributed.scheduler.worker-ttl': None}) 
dask.config.set({'distributed.scheduler.unknown-task-duration': '1h'})  
dask.config.set({'distributed.worker.lifetime.restart': True})
dask.config.set({'distributed.worker.profile.interval': '100ms'})
dask.config.set({'distributed.comm.timeouts.connect': '30minutes'}) 
dask.config.set({'distributed.comm.timeouts.tcp': '30minutes'}) 
dask.config.set({'distributed.comm.retry.count': 20})
dask.config.set({'distributed.admin.tick.limit': '3h'})
dask.config.set({'distributed.deploy.lost-worker-timeout': '30minutes'})
dask_client = Client(open(args.hostfile).readline().strip() + ':8786')  # the first node from the list will be used for a sheduler

for future, res in as_completed(dask_client.map(mol_dock, items), with_results=True):  # mol_dock is a function which performs main calculations
    ...  # process res

The number of input items used for testing is 10000, but this workflow is expected to work on millions. Every item is small and of size of kilobytes (a molecule). Individual computation may last from 2 to 20 minutes.

2 nodes

If I use 2 nodes calculations finish as expected (no errors in STDERR). However nodes are not fully loaded. After some time some workers were restarted (check by htop running time), some were closed (the overall number of workers is less than expected). Overall runtime is 11 hours.

STDOUT

# multiple such messages
distributed.core - INFO - Event loop was unresponsive in Worker for 72.24s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

Below is lifetime of a single worker which was started almost immediately after running the application. For some reason it was closed after 10 minutes of real time after not response in 300s. However, I increased timeouts to much larger values.

[ ESC[1mscheduler 158.XXX.XXX.XXX:8786ESC[0m ] : 2023-04-20 20:52:55,256 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://158.XXX.XXX.XXX:44072', status: init, memory: 0, processing: 0>
[ ESC[1mscheduler 158.XXX.XXX.XXX:8786ESC[0m ] : 2023-04-20 20:52:55,258 - distributed.scheduler - INFO - Starting worker compute stream, tcp://158.XXX.XXX.XXX:44072
[ worker 158.XXX.XXX.XXX ] : 2023-04-20 20:52:55,237 - distributed.worker - INFO -       Start worker at: tcp://158.XXX.XXX.XXX:44072
[ worker 158.XXX.XXX.XXX ] : 2023-04-20 20:52:55,237 - distributed.worker - INFO -          Listening to: tcp://158.XXX.XXX.XXX:44072
[ ESC[1mscheduler 158.XXX.XXX.XXX:8786ESC[0m ] : 2023-04-20 21:02:52,128 - distributed.scheduler - WARNING - Worker failed to heartbeat within 300 seconds. Closing: <WorkerState 'tcp://158.XXX.XXX.XXX:44072', status: running, memory: 0, processing: 2>
[ ESC[1mscheduler 158.XXX.XXX.XXX:8786ESC[0m ] : 2023-04-20 21:02:52,128 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://158.XXX.XXX.XXX:44072', status: running, memory: 0, processing: 2>
[ ESC[1mscheduler 158.XXX.XXX.XXX:8786ESC[0m ] : 2023-04-20 21:02:52,128 - distributed.core - INFO - Removing comms to tcp://158.XXX.XXX.XXX:44072
[ ESC[1mscheduler 158.XXX.XXX.XXX:8786ESC[0m ] : 2023-04-20 21:04:10,431 - distributed.scheduler - WARNING - Received heartbeat from unregistered worker 'tcp://158.XXX.XXX.XXX:44072'.
[ worker 158.XXX.XXX.XXX ] : 2023-04-20 21:04:10,431 - distributed.worker - INFO - Stopping worker at tcp://158.XXX.XXX.XXX:44072. Reason: scheduler-remove-worker
[ worker 158.XXX.XXX.XXX ] : 2023-04-20 21:04:10,433 - distributed.worker - ERROR - Scheduler was unaware of this worker 'tcp://158.XXX.XXX.XXX:44072'. Shutting down.

Dask Performance Report. Instead of 32 x 2 = 64 workers, only 15 were reported. However, htop showed occupancy of all cores in the beginning.

Duration: 11hr 11m
Tasks Information

    number of tasks: 11000
    compute time: 10d 9hr

Scheduler Information

    Address: tcp://158.XXX.XXX.XXX:8786
    Workers: 15
    Threads: 15
    Memory: 22.02 GiB
    Dask Version: 2023.2.0
    Dask.Distributed Version: 2023.2.0

enter image description here

10 nodes

If I use 10 nodes the calculations interrupted after 40-45 minutes (40% of all tasks were processed). I also observed that some workers are restarted or closed after approximately 10-12 minutes and gradually reduced to 0 workers. This behavior is consistent and repeated run results in approximately the same run time after which the app stops.

STDERR

/home/pavlop/anaconda3/envs/vina_cache/lib/python3.9/site-packages/paramiko/transport.py:219: CryptographyDeprecationWarning: Blowfish has been deprecated
  "class": algorithms.Blowfish,
Traceback (most recent call last):
  File "/home/pavlop/python/docking-scripts/moldock/run_dock.py", line 191, in <module>
    main()
  File "/home/pavlop/python/docking-scripts/moldock/run_dock.py", line 168, in main
    for i, (mol_id, res) in enumerate(docking(mols,
  File "/home/pavlop/python/docking-scripts/moldock/run_dock.py", line 42, in docking
    for future, (mol_id, res) in as_completed(dask_client.map(dock_func,
  File "/home/pavlop/anaconda3/envs/vina_cache/lib/python3.9/site-packages/distributed/client.py", line 5118, in __next__
    return self._get_and_raise()
  File "/home/pavlop/anaconda3/envs/vina_cache/lib/python3.9/site-packages/distributed/client.py", line 5107, in _get_and_raise
    raise exc.with_traceback(tb)
distributed.scheduler.KilledWorker: Attempted to run task mol_dock-aceba7791658ab7cd53d047f97edc803 on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://158.XXX.XXX.XXX:46460. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.
2023-04-21 09:45:09,366 - distributed.utils_comm - INFO - Retrying functools.partial(<function PooledRPCCall.__getattr__.<locals>.send_recv_from_rpc at 0x7f0a602684c0>, keys=['mol_dock-d6183800b4deda2c3b2bb52662bbb515', 'mol_dock-ea25e47a07897301e77a5a1b35b31805', 'mol_dock-9e2bbc9776658f09404c18097f730f34', 'mol_dock-9e43fc9d41b9277cf7951308035b90e6', 'mol_dock-c5fa32ccb2f87ec6d3c149b506735c66']) after exception in attempt 0/20: in <TCP (closed) ConnectionPool.gather local=tcp://158.XXX.XXX.XXX:54426 remote=tcp://158.XXX.XXX.XXX:8786>: Stream is closed
2023-04-21 09:45:09,401 - distributed.client - WARNING - Couldn't gather 5 keys, rescheduling {'mol_dock-ea25e47a07897301e77a5a1b35b31805': (), 'mol_dock-d6183800b4deda2c3b2bb52662bbb515': (), 'mol_dock-9e43fc9d41b9277cf7951308035b90e6': (), 'mol_dock-c5fa32ccb2f87ec6d3c149b506735c66': (), 'mol_dock-9e2bbc9776658f09404c18097f730f34': ()}
2023-04-21 09:45:09,404 - distributed.client - WARNING - Couldn't gather 5 keys, rescheduling {'mol_dock-ea25e47a07897301e77a5a1b35b31805': (), 'mol_dock-d6183800b4deda2c3b2bb52662bbb515': (), 'mol_dock-9e43fc9d41b9277cf7951308035b90e6': (), 'mol_dock-c5fa32ccb2f87ec6d3c149b506735c66': (), 'mol_dock-9e2bbc9776658f09404c18097f730f34': ()}
2023-04-21 09:45:09,407 - distributed.client - WARNING - Couldn't gather 5 keys, rescheduling {'mol_dock-ea25e47a07897301e77a5a1b35b31805': (), 'mol_dock-d6183800b4deda2c3b2bb52662bbb515': (), 'mol_dock-9e43fc9d41b9277cf7951308035b90e6': (), 'mol_dock-c5fa32ccb2f87ec6d3c149b506735c66': (), 'mol_dock-9e2bbc9776658f09404c18097f730f34': ()}
2023-04-21 09:45:09,409 - distributed.client - WARNING - Couldn't gather 5 keys, rescheduling {'mol_dock-ea25e47a07897301e77a5a1b35b31805': (), 'mol_dock-d6183800b4deda2c3b2bb52662bbb515': (), 'mol_dock-9e43fc9d41b9277cf7951308035b90e6': (), 'mol_dock-c5fa32ccb2f87ec6d3c149b506735c66': (), 'mol_dock-9e2bbc9776658f09404c18097f730f34': ()}
2023-04-21 09:45:09,411 - distributed.client - WARNING - Couldn't gather 5 keys, rescheduling {'mol_dock-ea25e47a07897301e77a5a1b35b31805': (), 'mol_dock-d6183800b4deda2c3b2bb52662bbb515': (), 'mol_dock-9e43fc9d41b9277cf7951308035b90e6': (), 'mol_dock-c5fa32ccb2f87ec6d3c149b506735c66': (), 'mol_dock-9e2bbc9776658f09404c18097f730f34': ()}

STDOUT is almost identical to the previous one.

Dask Performance Report. Only 168 workers were reported out of expected 320. The number of actually finished tasks is 4290 out of 5175 tasks reported.

Duration: 39m 33s
Tasks Information

    number of tasks: 5175
    compute time: 7d 4hr
    disk-read time: 5.28 ms

Scheduler Information

    Address: tcp://158.XXX.XXX.XXX:8786
    Workers: 168
    Threads: 168
    Memory: 246.62 GiB
    Dask Version: 2023.2.0
    Dask.Distributed Version: 2023.2.0

enter image description here


Solution

  • The suggestion of @Guillaume was correct. Among others I used a python C-extension which probably was compiled without releasing GIL. I was not aware of such issues before. I fixed this issue by creating a separate python script which performs all calculations and creates a json file as output. I run this script in a shell using subprocess and parse output json. This creates an overhead but for such a long computation it is negligible.