I am seeking to better understand the following behavior when using dask.delayed
to call a function that depends on parameters. The issue seems to arise when parameters are specified in a parameters file read by configparser. Here is a complete example:
parameter file:
#zpar.ini: parameter file for configparser
[my pars]
my_zpar = 2.
parser:
#zippy_parser
import configparser
def read(_rundir):
global rundir
rundir = _rundir
cp = configparser.ConfigParser()
cp.read(rundir + '/zpar.ini')
#[my pars]
global my_zpar
my_zpar = cp['my pars'].getfloat('my_zpar')
and the main python file:
# dask test with configparser
import dask
from dask.distributed import Client
import zippy_parser as zpar
def my_func(x, y):
# print stuff
print("parameter from main is: {}".format(main_par))
print("parameter from configparser is: {}".format(zpar.my_zpar))
# do stuff
return x + y
if __name__ == '__main__':
client = Client(n_workers = 4)
#read parameters from input file
rundir = '/path/to/parameter/file'
zpar.read(rundir)
#test zpar
print("zpar is {}".format(zpar.my_zpar))
#define parameter and call my_func
main_par = 5.
z = dask.delayed(my_func)(1., 2.)
z.compute()
client.close()
The first print statement in my_func() executes just fine, but the second print statement raises an exception. The output is:
zpar is 2.0 parameter from main is: 5.0 distributed.worker - WARNING - Compute Failed Function: my_func args: (1.0, 2.0) kwargs: {} Exception: AttributeError("module 'zippy_parser' has no attribute 'my_zpar'",)
I am new to dask. I suppose this has something to do with the serialization, which I do not understand. Can someone enlighten me and/or point to relevant documentation? Thanks!
I will try to keep this brief.
When a function is serialised in order to be sent to workers, python also sends local variables and functions needed by the function (its "closure"). However, it stores the modules it references by name, it does not try to serialise your whole runtime.
This means that zippy_parser
is imported in the worker, not deserialised. Since the function read
has never been called
in the worker, the global
variable is never initialised.
So, you could call read
in the workers as part of your function or otherwise, but probably the pattern or setting module-global variables from with a function isn't great. Dask's delayed mechanism prefers functional purity, that the result you get should not depend on the current state of the runtime.
(note that if you had created the client after calling read
in the main script, the workers might have got the in-memory version, depending on how subprocesses are configured to be created on your system)