Search code examples
daskdask-distributed

Dask : tasks submit with resources constraints not working


I submit a Dask task like that :

client = Client(cluster)
future = client.submit(
    # dask task
    my_dask_task,  # a task that consume at most 100MiB

    # task arguments
    arg1,
    arg2,
)

Everything work fine.

Now I set some constraints :

client = Client(cluster)
future = client.submit(
    # dask task
    my_dask_task,  # a task that consume at most 100MiB

    # task arguments
    arg1,
    arg2,

    # resource constraints at the Dask scheduler level
    resources={
        'process': 1,
        'memory': 100*1024*1024  # 100MiB
    }
)

The problem is, in that case, the future is never resolved. And the Python program wait for ever. Even with only 'process': 1 and/or setting very few amount of ram like 'memory': 10. So its weird.

Along this reduced example, in my real world application, a given Dask worker is configured to have multiples processes, and thus, may run at the same times multiples tasks.

So I want to set the RAM amount of each task, to avoid the Dask scheduler to run tasks on a given Dask worker, that can lead to out of memory errors.

Why it doesn't work as expected ? How to debug ?

Thank you


Solution

  • Adding to @pavithraes's comment - the resources argument to client.submit and other scheduling calls does NOT modify the available workers. Instead, it creates a constraint on the workers that can be used for the given tasks. Importantly, the terms you use here, "process" and "memory" are not interpreted by dask in terms of physical hardware - they are simply qualifiers you can define that dask uses to filter the available workers to only those which match your tag criteria.

    From the dask docs:

    Resources listed in this way are just abstract quantities. We could equally well have used terms “mem”, “memory”, “bytes” etc. above because, from Dask’s perspective, this is just an abstract term. You can choose any term as long as you are consistent across workers and clients.

    It’s worth noting that Dask separately track number of cores and available memory as actual resources and uses these in normal scheduling operation.

    Because of this, your tasks hang forever because the scheduler is actually waiting for workers which meet your conditions to appear so that it can schedule these tasks. Unless you create workers with these tags applied, the jobs will never start.

    See the dask docs on specifying and using worker resources, and especially the section on Specifying Resources, for more information about how to configure workers such that such resource constraints can be applied.