Search code examples
pythondaskdask-distributed

Dask worker has different imports than main thread


I have a dask delayed function that uses some options defined in another submodule. There's also a third module that modifies these options when imported.

If the imports happen after __name__ == '__main__' (in a notebook, for example), running the function in a distributed client ignores the modified options.

Is there a way to make sure the client worker have done the same "imports" as the main thread before running any computation ?


Here's a MWE, it uses three python modules:

constants.py:

N = 1

add.py:

import dask
import constants as c


@dask.delayed
def add(da):
    return da + c.N

overhead.py:

import constants

constants.N = 4

Then if I run the following script, it works (output is 5):

from dask.distributed import Client
import dask
import add
import overhead

if __name__ == '__main__':
    c = Client(n_workers=2, threads_per_worker=2)
    print(dask.compute(add.add(1))[0])

But if we import the submodules after if __name__ == '__main__', it fails (output is 2):

from dask.distributed import Client
import dask

if __name__ == '__main__':
    import add
    import overhead
    c = Client(n_workers=2, threads_per_worker=2)
    print(dask.compute(add.add(1)))

Another working solution is to trigger some code related to the overhead module, so simply:

from dask.distributed import Client
import dask

if __name__ == '__main__':
    import add
    import overhead
    c = Client(n_workers=2, threads_per_worker=2)
    c.run(lambda: overhead)
    print(dask.compute(add.add(1)))

This works. But requires me to know which module should be "triggered". I would prefer a more generic solution ?


Of course, this is a simplified example. If it may help for context, my real-life issue is using intake-esm to read files (culprit function is intake_esm.source._open_dataset). I have another package that calls intake_esm.utils.set_options upon import. That option change is not respected when I run the workflow in a notebook but it works if I run it as a script (with all imports at the top of the file).


Solution

  • When you start the dask distributed cluster, new subprocesses are created. Unless you use fork (&), this will involve reimporting the current script, with a name other than __main__ . This is how the if block prevents each worker process from also trying to create clusters-within-clusters. So, by placing your import within the if block, you stop the workers from executing them.

    Dask is generally aiming to be functional/stateless, so that each function call produces results based only on the arguments it is supplied. If you need to set up process state (such as executing imports or setting config), you should use the preload plugin: https://docs.dask.org/en/stable/how-to/customize-initialization.html . You can also attempt to customise the serialisation of your arguments to include dynamic state, perhaps as a closure - but these are advanced topics.

    &: fork is not recommended because of the various other problems it causes.