I need to return a tuple from a task which has to be unpacked in the main process because each element of the tuple will go to different dask tasks. I would like to avoid unnecessary communication so I think that the tuple elements should be Futures.
The best way I came up with is to scatter the data only to the same worker to get the future. Is there a better way to do it?
import numpy as np
import time
from dask.distributed import Client
from dask.distributed import get_client, get_worker
def other_task(data):
return len(data)
def costly_task():
start = time.time()
client = get_client()
data = np.arange(100000000)
metadata = len(data)
print('worker time', time.time() - start)
start = time.time()
future_data, future_metadata = client.scatter(data, workers=[get_worker().id]), client.scatter(metadata, workers=[get_worker().id])
print('scatter time', time.time() - start)
return future_data, future_metadata
if __name__ == '__main__':
client = Client(processes=True)
start = time.time()
future_data, future_metadata = client.submit(costly_task).result()
metadata = future_metadata.result()
print('costly task time', time.time() - start)
other_metadata = client.submit(other_task, future_data).result()
print('total time', time.time() - start)
The times I get with the script above are:
worker time 0.12443423271179199
scatter time 0.7880995273590088
costly task time 0.923513650894165
total time 0.9366424083709717
It is possible via delayed
:
from dask.distributed import Client
from dask import delayed
@delayed(nout=3)
def costly_task():
return 1, 2, 3
client = Client(processes=True)
a, b, c = costly_task() # these are delayed
future_a, future_b, future_c = client.compute([a, b, c]) # split results
See this Q&A for further details.