I have a dask delayed function that uses some options defined in another submodule. There's also a third module that modifies these options when imported.
If the imports happen after __name__ == '__main__'
(in a notebook, for example), running the function in a distributed client ignores the modified options.
Is there a way to make sure the client worker have done the same "imports" as the main thread before running any computation ?
Here's a MWE, it uses three python modules:
constants.py
:
N = 1
add.py
:
import dask
import constants as c
@dask.delayed
def add(da):
return da + c.N
overhead.py
:
import constants
constants.N = 4
Then if I run the following script, it works (output is 5
):
from dask.distributed import Client
import dask
import add
import overhead
if __name__ == '__main__':
c = Client(n_workers=2, threads_per_worker=2)
print(dask.compute(add.add(1))[0])
But if we import the submodules after if __name__ == '__main__'
, it fails (output is 2
):
from dask.distributed import Client
import dask
if __name__ == '__main__':
import add
import overhead
c = Client(n_workers=2, threads_per_worker=2)
print(dask.compute(add.add(1)))
Another working solution is to trigger some code related to the overhead
module, so simply:
from dask.distributed import Client
import dask
if __name__ == '__main__':
import add
import overhead
c = Client(n_workers=2, threads_per_worker=2)
c.run(lambda: overhead)
print(dask.compute(add.add(1)))
This works. But requires me to know which module should be "triggered". I would prefer a more generic solution ?
Of course, this is a simplified example. If it may help for context, my real-life issue is using intake-esm
to read files (culprit function is intake_esm.source._open_dataset
). I have another package that calls intake_esm.utils.set_options
upon import. That option change is not respected when I run the workflow in a notebook but it works if I run it as a script (with all imports at the top of the file).
When you start the dask distributed cluster, new subprocesses are created. Unless you use fork
(&), this will involve reimporting the current script, with a name other than __main__
. This is how the if
block prevents each worker process from also trying to create clusters-within-clusters. So, by placing your import within the if
block, you stop the workers from executing them.
Dask is generally aiming to be functional/stateless, so that each function call produces results based only on the arguments it is supplied. If you need to set up process state (such as executing imports or setting config), you should use the preload plugin: https://docs.dask.org/en/stable/how-to/customize-initialization.html . You can also attempt to customise the serialisation of your arguments to include dynamic state, perhaps as a closure - but these are advanced topics.
&: fork
is not recommended because of the various other problems it causes.