Search code examples
pandasdaskdask-distributeddask-delayeddask-kubernetes

How does Dask execute code on multiple vm's in the cloud


I wrote a program with dask and delayed and now I want to run it on several machines in the cloud. But there's one thing I don't understand - how does dask run the code on multiple machines in the cloud without having all the dependencies of the code?


Solution

  • When running on multiple machines Dask workers must have access to all required dependencies in order to be able to run your code.

    You have labelled your question with dask-kubernetes so I'll use that as an example. By default dask-kubernetes uses the daskdev/dask Docker image to run your workers. This image contains Python and the minimal dependencies to run Dask distributed.

    If your code requires an external dependency you must ensure this is installed in the image. The Dask docker image supports installing extra packages at runtime by setting either the EXTRA_APT_PACKAGES, EXTRA_CONDA_PACKAGES or EXTRA_PIP_PACKAGES environment variables.

    # worker-spec.yml
    
    kind: Pod
    metadata:
      labels:
        foo: bar
    spec:
      restartPolicy: Never
      containers:
      - image: daskdev/dask:latest
        imagePullPolicy: IfNotPresent
        args: [dask-worker, --nthreads, '2', --no-dashboard, --memory-limit, 6GB, --death-timeout, '60']
        name: dask
        env:
          - name: EXTRA_APT_PACKAGES
            value: packagename  # Some package to install with `apt install`
          - name: EXTRA_PIP_PACKAGES
            value: packagename  # Some package to install with `pip install`
          - name: EXTRA_CONDA_PACKAGES
            value: packagename  # Some package to install with `conda install`
        resources:
          limits:
            cpu: "2"
            memory: 6G
          requests:
            cpu: "2"
            memory: 6G
    
    from dask_kubernetes import KubeCluster
    
    cluster = KubeCluster.from_yaml('worker-spec.yml')
    

    The downside of this is that packages must be installed every time a worker starts, which can make adaptive scaling slow. So alternatively you can create your own Docker image with all your dependencies already installed and publish it to Docker Hub. Then use that instead in your configuration.

    kind: Pod
    metadata:
      labels:
        foo: bar
    spec:
      restartPolicy: Never
      containers:
      - image: me/mycustomimage:latest
        imagePullPolicy: IfNotPresent
        args: [dask-worker, --nthreads, '2', --no-dashboard, --memory-limit, 6GB, --death-timeout, '60']
        name: dask
        resources:
          limits:
            cpu: "2"
            memory: 6G
          requests:
            cpu: "2"
            memory: 6G