I'm using Dask to process research batches, which are quite heavy (from few minutes to few hours). There's no communication between the tasks and they produce only side results. I'm using a machine which already virtualizes resources beneath it (~ 30 CPUs), so I'm just running LocalCluster
. Is there any way to assign a specific CPU to a task? In docs there're only examples with gpu and memory.
I've tried to assign CPU in a similar way, but the tasks won't even start to process.
client.submit(process, d, resources={'CPU': 1}) for d in data]
The likely reason that the tasks didn't start when you specified
client.submit(process, d, resources={'CPU': 1}) for d in data]
is that the cluster was initiated without specifying that each worker had that resource (this has to be done at the time workers are started). Here's how to make sure that workers have that resource:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(resources={'CPU': 1})
client = Client(cluster)
For finer-grained control, it is possible to assign tasks to specific workers. First, get the addresses of each worker with
list_workers = list(client.scheduler_info()['workers'])
Then specify which worker(s) can complete the task:
# submit for completion only by the first worker in the list
results_specific_worker = [client.submit(process, d, workers=list_workers[0]) for d in data]
# submit for completion by the first two workers
results_specific_workers = [client.submit(process, d, workers=list_workers[0:2]) for d in data]