Search code examples
pythonpython-3.xdaskdask-distributed

How to update settings on dask workers' container which were initially loaded through a register worker callback?


So I have 2 containers with dask workers setup. I'll refer to them as

  • main which has 4 workers and the full application
  • remote which has another 4 workers, the full app's code but isn't running the full app, just the workers code.

I'm passing a dictionary with settings to each worker from the main container through a setup function callback using register_worker_callbacks()

I do it like so:

await client.register_worker_callbacks(lambda: setup_worker(log_config, settings))

And the setup function

def setup_worker(log_config, settings_object):
    setup_logging(log_config)
    settings.__dict__.update(settings_object.__dict__)

Both containers have this piece of code so it works fine, no problems.

BUT I also have a cronjob running daily which retrieves a new settings file (in JSON) from an external source and from, settings object on the main container is reloaded / updated.

This newly updated settings object needs to propagate and update the settings on the workers too, basically I need a way to call the last line of the setup_worker function again, doing the exact same thing. But since the workers are already registered and connected at this point, I can't re-use the same callback, can I? How would I achieve the same?


Solution

  • It worked simply by using

    await client.run(func, args)
    

    as documented here

    It takes a callable / function and executes it on all workers without using the scheduler by default. So to update the same settings which are usually loaded by a callback, you can just call that same function with the same arguments here.

    So I ended up with 2 functions like so:

    def setup_worker(log_config, settings_object):
        setup_logging(log_config)
        update_settings(settings_object)
    
    def update_settings(settings_object):
        settings.__dict__.update(settings_object.__dict__)
    

    The setup callback is set up the same way as before

    await client.register_worker_callbacks(lambda: setup_worker(log_config, settings))
    

    But I have an additional function being called when the settings are updated on the main container like so:

    def update_settings():
        ...
        await client.run(update_settings, settings)