I was trying to achieve something similar to these questions (Initializing state on dask-distributed workers, Setting up Dask worker with variable), where I have a (relatively) large model that I want to pre-initialize on a subset of workers that will accept tasks that require the model. Ideally, I don't want the client machine to even have the model.
My initial attempt, before finding those questions, was to define a delayed
task in a shared module, worker_task.model
, and assign a module global variable (e.g. worker_tasks.model.model
) in the workers' --preload
script for that task to use; however, this didn't work for some reason - the variable gets set in the preload script, but is still None
when the task is called.
init_model_worker.py:
import logging
from uuid import uuid4
from worker_tasks import model
def dask_setup(worker):
model.model = f'<mock model {uuid4()}>'
logger = logging.getLogger('distributed')
logger.warning(f'model = {model.model}')
worker_tasks/model.py:
import logging
import random
from time import sleep
from uuid import uuid4
import dask
model = None
@dask.delayed
def compute_clinical(inp):
if model is None:
raise RuntimeError('Model not initialized.')
sleep(random.uniform(3, 17))
return {
'result': random.choice((True, False)),
'confidence': random.uniform(0, 1)
}
This is the worker log when I start it and submit something to the scheduler:
> dask-worker --preload init_model_worker.py tcp://scheduler:8786 --name model-worker
distributed.utils - INFO - Reload module init_model_worker from .py file
distributed.nanny - INFO - Start Nanny at: 'tcp://172.28.0.4:41743'
distributed.diskutils - INFO - Found stale lock file and directory '/worker-epptq9sh', purging
distributed.utils - INFO - Reload module init_model_worker from .py file
distributed - WARNING - model = <mock model faa41af0-d925-46ef-91c9-086093d37c71>
distributed.worker - INFO - Start worker at: tcp://172.28.0.4:37973
distributed.worker - INFO - Listening to: tcp://172.28.0.4:37973
distributed.worker - INFO - nanny at: 172.28.0.4:41743
distributed.worker - INFO - bokeh at: 172.28.0.4:37766
distributed.worker - INFO - Waiting to connect to: tcp://scheduler:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 4
distributed.worker - INFO - Memory: 1.93 GB
distributed.worker - INFO - Local Directory: /worker-mhozo9ru
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://scheduler:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - WARNING - Compute Failed
Function: compute_clinical
args: ('mock')
kwargs: {}
Exception: RuntimeError('Model not initialized.')
You can see that after reloading the preload script, the model
is <mock model faa41af0-d925-46ef-91c9-086093d37c71>
; but when I try to call it from the task, I get None
.
I will try to implement a solution based on the answers to the other questions, but I have several questions related to worker preload:
None
when I call the task, after I assigned it in the preload script?--preload
script? Is it better to invoke initialization of worker state from the client? If so, why?I suspect that the model variable is bundled into your function immediately by however Python serializes functions. You might try this instead:
@dask.delayed
def compute_clinical(inp):
from worker_tasks.model import model
if model is None:
raise RuntimeError('Model not initialized.')
Or, rather than assign variables to global module scope (which can be difficult to understand in Python), perhaps try assigning it to the worker itself instead.
from dask.distributed import get_worker
def dask_setup(worker):
worker.model = f'<mock model {uuid4()}>'
@dask.delayed
def compute_clinical(inp):
if get_worker().model is None:
raise RuntimeError('Model not initialized.')