Search code examples
pythondaskdask-distributed

Initializing task module global in dask worker using --preload?


I was trying to achieve something similar to these questions (Initializing state on dask-distributed workers, Setting up Dask worker with variable), where I have a (relatively) large model that I want to pre-initialize on a subset of workers that will accept tasks that require the model. Ideally, I don't want the client machine to even have the model.

My initial attempt, before finding those questions, was to define a delayed task in a shared module, worker_task.model, and assign a module global variable (e.g. worker_tasks.model.model) in the workers' --preload script for that task to use; however, this didn't work for some reason - the variable gets set in the preload script, but is still None when the task is called.

init_model_worker.py:

import logging
from uuid import uuid4

from worker_tasks import model


def dask_setup(worker):
    model.model = f'<mock model {uuid4()}>'

    logger = logging.getLogger('distributed')
    logger.warning(f'model = {model.model}')

worker_tasks/model.py:

import logging
import random
from time import sleep
from uuid import uuid4

import dask

model = None


@dask.delayed
def compute_clinical(inp):        
    if model is None:
        raise RuntimeError('Model not initialized.')

    sleep(random.uniform(3, 17))

    return {
        'result': random.choice((True, False)),
        'confidence': random.uniform(0, 1)
        }

This is the worker log when I start it and submit something to the scheduler:

> dask-worker --preload init_model_worker.py tcp://scheduler:8786 --name model-worker
distributed.utils - INFO - Reload module init_model_worker from .py file                                  
distributed.nanny - INFO -         Start Nanny at: 'tcp://172.28.0.4:41743'                         
distributed.diskutils - INFO - Found stale lock file and directory '/worker-epptq9sh', purging      
distributed.utils - INFO - Reload module init_model_worker from .py file                                  
distributed - WARNING - model = <mock model faa41af0-d925-46ef-91c9-086093d37c71>                   
distributed.worker - INFO -       Start worker at:     tcp://172.28.0.4:37973                       
distributed.worker - INFO -          Listening to:     tcp://172.28.0.4:37973                       
distributed.worker - INFO -              nanny at:           172.28.0.4:41743                       
distributed.worker - INFO -              bokeh at:           172.28.0.4:37766                       
distributed.worker - INFO - Waiting to connect to:       tcp://scheduler:8786                       
distributed.worker - INFO - -------------------------------------------------                       
distributed.worker - INFO -               Threads:                          4                       
distributed.worker - INFO -                Memory:                    1.93 GB                       
distributed.worker - INFO -       Local Directory:           /worker-mhozo9ru                       
distributed.worker - INFO - -------------------------------------------------                       
distributed.worker - INFO -         Registered to:       tcp://scheduler:8786                       
distributed.worker - INFO - -------------------------------------------------                       
distributed.core - INFO - Starting established connection                                           
distributed.worker - WARNING -  Compute Failed                                                      
Function:  compute_clinical                                                                         
args:      ('mock')                                                                                 
kwargs:    {}                                                                                       
Exception: RuntimeError('Model not initialized.')                                                   

You can see that after reloading the preload script, the model is <mock model faa41af0-d925-46ef-91c9-086093d37c71>; but when I try to call it from the task, I get None.

I will try to implement a solution based on the answers to the other questions, but I have several questions related to worker preload:

  1. Why is the model None when I call the task, after I assigned it in the preload script?
  2. Is it generally recommended to avoid doing things like this in the worker --preload script? Is it better to invoke initialization of worker state from the client? If so, why?

Solution

  • I suspect that the model variable is bundled into your function immediately by however Python serializes functions. You might try this instead:

    @dask.delayed
    def compute_clinical(inp):       
        from worker_tasks.model import model
    
        if model is None:
            raise RuntimeError('Model not initialized.')
    

    Or, rather than assign variables to global module scope (which can be difficult to understand in Python), perhaps try assigning it to the worker itself instead.

    from dask.distributed import get_worker
    
    def dask_setup(worker):
        worker.model = f'<mock model {uuid4()}>'
    
    @dask.delayed
    def compute_clinical(inp):       
        if get_worker().model is None:
            raise RuntimeError('Model not initialized.')