I have written a short python script using dask to orchestrate a workflow. What it does is : launch a first batch of jobs, no more than the number of available workers. Then, using as_completed() I monitor the status of the futures, and once a job is finished, a new one will be submitted to the sequence.
import random
import time
from dask.distributed import Client, as_completed, get_client
def inc(x):
time.sleep(4 + random.random())
return x + 1
def orchestrate(n):
job_count = 0
nb_workers = 2
client = get_client()
futures = []
print(len(n))
# Do not launch or put into graph more tasks than available workers
for _ in range(nb_workers):
futures.append(client.submit(inc, n[_]))
job_count += 1
print("Total number of jobs: ",job_count)
# Use a sequence to check futures as they are completed
sequence = as_completed(futures)
print("Remaining futures", sequence.count())
for seq in sequence:
if job_count < len(n):
new_futures = client.submit(inc, n[job_count])
print(sequence)
sequence.add(new_futures)
job_count += 1
print("Number of jobs submitted: ", job_count)
print("Remaining futures: ", sequence.count())
elif sequence.count() == 0:
break
else:
print("Remaining futures:", sequence.count())
return None
However I have noted a strange behavior. If the code is run like below, then it seem to be working as expected. The good number of jobs is submitted to the client, the cell is executed in the jupyter notebook.
x = [i for i in range(4)]
orchestrate(x) #This works fine, 4 jobs are submitted.
However, my concern is that doing so, the Jupyter notebook hangs until the cell is executed. So I would like to instead use submit to execute the method, thus not blocking the notebook (so that I can quit jupyterlab and the execution will not be stopped), like below :
futs = client.submit(orchestrate, x)
However, this does not work as expected. The futures' status always remains as "pending" : all jobs but 1 are submitted and executed, the last one is never run.
Note that I do not care about retrieving the result of the executed method (in my real situation, a file is written on the disk as a result).
How can this behavior be explained ? The methode "orchestration" is still the same, only being submited to the client...
Am I doing something wrong ? Should I proceed differently in order not to lock the execution of the notebook ?
After copy-pasting your code and running it on my machine, I see full execution of the script. I am running the latest version of dask (2021.03.0) and distributed (2021.03.0).
In execution on actual data, there is a potential problem with using get_client()
, as per docs:
However, this can deadlock the scheduler if too many tasks request jobs at once. Each task does not communicate to the scheduler that they are waiting on results and are free to compute other tasks. This can deadlock the cluster if every scheduling slot is running a task and they all request more tasks.
To avoid it the docs suggest using secede
/rejoin
or a worker_client
context manager.