Search code examples
daskdask-distributed

Dask distributed pause task to wait for subtask - how-to, or bad practice?


I am running tasks using client.submit thus:

from dask.distributed import Client, get_client, wait, as_completed
# other imports

zip_and_upload_futures = [ client.submit(zip_and_upload, id, path, file_list) for id, file_list in enumerate(file_lists) ]
upload_futures = []
failed_list = []

for future in as_completed(zip_and_upload_futures):
    result = future.result()
    upload_futures.append(result[0])
    print(f'Zip {result[1]} creating and added to upload queue.')
    del future

for future in as_completed(upload_futures):
    if 'finished' in future.status:
        print('Zip {future.result()[0]} uploaded.')
        del f
    elif 'exception' in future.status or 'error' in future.status:
        failed_list.append(future.exception(),future.traceback())
        del f

In zip_and_upload(id, path, file_list), files are zipped to in-memory file-like (BytesIO) objects, as I'm limited on local disk, then the zip objects are uploaded.

def zip_and_upload(id, path, file_list):
    client = get_client()
    zip_future = client.submit(zip_file_list, id, file_list, compression)
    wait(zip_future)
    results = zip_future.result() # tuple == (zip_data, name_list)
    future = client.submit(upload, id, results[0], results[1])
    return future, f'{path}_{id}.zip'

I get a lot of errors from the scheduler along the lines of distributed.scheduler - ERROR - Couldn't gather keys: {'zip_file_list-<hash>': 'queued'}

Questions:

  1. is this a viable use of Dask with tasks and subtasks?
  2. is there a better way (I'm sure there is, but one that doesn't require much refactoring and still uses Dask) (I tried fire_and_forget for the upload part, but want to catch any upload failures)?
  3. why am I getting errors saying the zip_file_list task is queued? I'm trying to make that task blocking anyway, so shouldn't it be letting it queue, then execute?

Many thanks.


Solution

  • As described in the documentation, launching tasks from task

    can deadlock the scheduler if too many tasks request jobs at once. Each task does not communicate to the scheduler that they are waiting on results and are free to compute other tasks. This can deadlock the cluster if every scheduling slot is running a task and they all request more tasks.

    I think this is what you are encountering. I don't thing you really need to submit theses tasks, especially because you are waiting on the first one before submitting the other one, so maybe just remove the client.submit inside your nested function.

    If you really need this kind of workflow, then please read the page already mentionned:

    To avoid this deadlocking issue we can use secede and rejoin. These functions will remove and rejoin the current task from the cluster respectively.