I am trying to use Dask to distribute calculations over multiple systems. However, there is some concept I fail to understand because I cannot reproduce a logical behavior with a simple test that I was using for python mutliprocessing.
I am using this very naive code:
import dask
from dask.distributed import Client
import time
def costly_simulation(p):
time.sleep(4)
return p * 2
if __name__ == "__main__":
client = Client('localhost:8786')
input_array = [10, 10, 10, 10, 10, 10, 10, 10]
futures = []
for p in input_array:
future = client.submit(costly_simulation, p)
futures.append(future)
results = client.gather(futures)
print(results[:])
And I start a dask cluster this way, in 2 separated shells:
dask-scheduler
And
dask-worker --nworkers 1 --nthreads 1 localhost:8786
So all is running on local system, and I should be using a single process to do calculations.
However, whatever I do, execution always takes around 4s + a little more (sleep time + some execution stuff).
I don't understand, because it should take here 4 * 8 s to execute since I have only a single process as worker. Even with a 100 item array as input ([10] * 100), it still takes 4s to execute.
Note that output is always the good result, so an array of 20.
And on scheduler side, I can see the following logs:
2023-03-24 14:58:08,929 - distributed.scheduler - INFO - Receive client connection: Client-4a1b3e7f-ca54-11ed-a87a-853d3d7f4042
2023-03-24 14:58:08,930 - distributed.core - INFO - Starting established connection to tcp://127.0.0.1:46334
2023-03-24 14:58:12,981 - distributed.scheduler - INFO - Remove client Client-4a1b3e7f-ca54-11ed-a87a-853d3d7f4042
2023-03-24 14:58:12,981 - distributed.core - INFO - Received 'close-stream' from tcp://127.0.0.1:46334; closing.
2023-03-24 14:58:12,982 - distributed.scheduler - INFO - Remove client Client-4a1b3e7f-ca54-11ed-a87a-853d3d7f4042
2023-03-24 14:58:12,982 - distributed.scheduler - INFO - Close client connection: Client-4a1b3e7f-ca54-11ed-a87a-853d3d7f4042
What do I miss ?
This was covered in an answer to a related question. Specifically, you are passing the same input to a function and dask
by default assumes that it will return the same value, so it computes the task only once.
If you want the behaviour of recomputing the function even when input is the same, use pure=False
kwarg:
future = client.submit(costly_simulation, p, pure=False)