Search code examples
google-kubernetes-enginedaskkubernetes-helmportforwarding

Installing dask on gke with clusterIP scheduler and port forwarding - tcp connection times out when creating a dask.distibuted client in python


I tried to deploy dask on gke cluster using helm and tiller. I do not have permissions to create external IPs for my cluster, so I set up the dask scheduler as a clusterIP instead of loadbalancer. I then used kubectl provided port forwarding to forward the dask scehduler service to my local machine.

I have a python code (a simple dask example) where I want to connect the distributed client to the (forwarded) tcp port of the dask scheduler and launch computes in my gke cluster. However I receive a timeout on the tcp connection.

I am wondering where I could have gone wrong in this process or if I need more enable more permissions in my gcp account to get this to work. Any pointers you can provide is greatly appreciated. Please note that I was able to open the jupyter notebook on the forwarded http port and able to trigger some computations on the default 3 workers that dask created on my gke

I am pasting below the the python code I ran, the error I see, the current status of pods, nodes, services setup for dask on gke and the commands I used to setup dask on my gcp-gke

my sample python program (dask-example.py)

#!/usr/bin/env python3

from dask.distributed import Client
import dask.array as da

client = Client('tcp://127.0.0.1:8080')

array = da.ones((1000, 1000, 1000))

mn = array.mean().compute()  # Should print 1.0

print(mn)

Error message (on running python3 dask-example.py):

Traceback (most recent call last):
  File "/home/userenv/lib/python3.8/site-packages/distributed/comm/core.py", line 286, in connect
    comm = await asyncio.wait_for(
  File "/usr/lib/python3.8/asyncio/tasks.py", line 498, in wait_for
    raise exceptions.TimeoutError()
asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "trial.py", line 17, in <module>
    client = Client('tcp://127.0.0.1:8080', timeout=10)
  File "/home/userenv/lib/python3.8/site-packages/distributed/client.py", line 743, in __init__
    self.start(timeout=timeout)
  File "/home/userenv/lib/python3.8/site-packages/distributed/client.py", line 948, in start
    sync(self.loop, self._start, **kwargs)
  File "/home/userenv/lib/python3.8/site-packages/distributed/utils.py", line 340, in sync
    raise exc.with_traceback(tb)
  File "/home/userenv/lib/python3.8/site-packages/distributed/utils.py", line 324, in f
    result[0] = yield future
  File "/home/userenv/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/userenv/lib/python3.8/site-packages/distributed/client.py", line 1038, in _start
    await self._ensure_connected(timeout=timeout)
  File "/home/userenv/lib/python3.8/site-packages/distributed/client.py", line 1095, in _ensure_connected
    comm = await connect(
  File "/home/userenv/lib/python3.8/site-packages/distributed/comm/core.py", line 308, in connect
    raise IOError(
OSError: Timed out trying to connect to tcp://127.0.0.1:8080 after 10 s

status of dask on my gke

kubectl get po #shows this

NAME                                       READY   STATUS              RESTARTS   AGE
my-dask-jupyter-565c5c5659-w4s76           1/1     Running             0          27h
my-dask-scheduler-6bf8bc8bbf-xgj2q         1/1     Running             0          27h
my-dask-worker-68b5b695bd-l2b6m            1/1     Running             0          27h
my-dask-worker-68b5b695bd-xnssz            1/1     Running             0          27h
my-dask-worker-68b5b695bd-z68wt            1/1     Running             0          27h

kubectl get no #shows this

NAME                                          STATUS   ROLES    AGE   VERSION
gke-dask-cluster-default-pool-d3f451b1-gp47   Ready    <none>   27h   v1.17.14-gke.1200
gke-dask-cluster-default-pool-d3f451b1-rk8z   Ready    <none>   27h   v1.17.14-gke.1200

kubectl get svc #shows this

NAME                TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)           AGE
kubernetes          ClusterIP   10.83.240.1    <none>        443/TCP           27h
my-dask-jupyter     ClusterIP   10.83.244.84   <none>        80/TCP            27h
my-dask-scheduler   ClusterIP   10.83.244.59   <none>        8786/TCP,80/TCP   27h

Commands I used to setup dask (thanks to this post https://libinruan.github.io/2019/05/24/Set-up-Kubernetes-clusters-for-Python-ML/)

export PROJECTID='mygcp'
export EMAIL="[email protected]"
export ZONE='us-central1-c'
export REGION='us-central1' 
export MIN_WORKER_NODES=0
export MAX_WORKER_NODES=100
export CLUSTER_NAME='dask-cluster'
export WORKER_MACHINE_TYPE='n1-standard-2'
export MACHINE_TYPE='n1-standard-2'
NUM_NODES=2
 
gcloud config set project $PROJECTID

gcloud services enable container.googleapis.com

gcloud container clusters create $CLUSTER_NAME --machine-type $MACHINE_TYPE --num-nodes $NUM_NODES --zone $ZONE --cluster-version latest

gcloud container clusters get-credentials $DASK_KUBE_CLUSTER_NAME --zone=$DASK_KUBE_CLUSTER_ZONE --project $DASK_GCLOUD_PROJECT

kubectl config set-cluster $DASK_KUBE_CLUSTER_NAME

kubectl create clusterrolebinding cluster-admin-binding --clusterrole=cluster-admin --user=$EMAIL

kubectl create serviceaccount tiller --namespace=kube-system

kubectl create clusterrolebinding tiller --clusterrole=cluster-admin --serviceaccount=kube-system:tiller
helm init --service-account tiller --wait

kubectl --namespace=kube-system patch deployment tiller-deploy --type=json \
--patch='[{"op": "add", "path": "/spec/template/spec/containers/0/command", "value": ["/tiller", "--listen=localhost:44134"]}]'

gcloud container clusters get-credentials $CLUSTER_NAME --zone=$ZONE --project $PROJECTID

kubectl config set-cluster $DASK_KUBE_CLUSTER_NAME

helm install -n my-dask stable/dask -f dask-worker-spec.yml --set scheduler.serviceType=ClusterIP --set jupyter.serviceType=ClusterIP


Solution

  • I've managed to reproduce the same behavior as described in the question.

    I encourage you to check the newer version of dask which is available on it's homepage as the one used in the question is deprecated.

    TL;DR

    Use the same Python modules (dask, distributed, etc..) versions of the software that dask-scheduler is using!


    Reproduction of the setup used in the question

    • Create GKE cluster:
      • $ gcloud beta container clusters create "gke-dask" --zone "europe-west3-c" --no-enable-basic-auth --cluster-version "1.17.13-gke.2600" --release-channel "regular" --machine-type "e2-standard-4"

    e2-standard-4 = 4 cores and 16GB RAM machine

    To identify the issue I ran some tests:

    • $ kubectl port-forward dask-scheduler to my machine and use the example Python code that was in the question. I installed the dependencies this code needed but in different versions ($ pip install dask[complete]. Code failed with the same message as in the question.
    • $ kubectl run -it --rm ubuntu -- /bin/bash - the error from the question indicated that it could be a network related problem. I used ubuntu Pod with the same Python installation to eliminate potential network connectivity issues (port-forward). Code failed with the same message as in the question.
    • $ kubectl exec -it DASK-SCHEDULER-POD-NAME -- /bin/bash - if this is a network connectivity issue it should run on a Pod that is supposed to handle this code. Code ran successfully and returned 1.0.

    I tried to use an ubuntu Pod with Python and it's modules more close with a version to the one in a dask-scheduler. It produced an error different from the one in a question. This indicates that there is no inherent issue with the networking but with the software used (it's versions). I couldn't exactly replicate the setup used in dask-scheduler so I've used the image that dask-scheduler is using and spawned additional Pod to check if it will work through the GKE network. It worked!

    Please conisder below example to be a workaround!

    Steps to run from your machine:

    • $ kubectl port-forward service/my-dask-scheduler --address 0.0.0.0 8786:8786 & - forward dask service to your machine
    • $ docker run --rm -ti daskdev/dask:1.1.5 /bin/bash - run a Docker container on your machine and exec into it's bash. This image is the same as your dask-scheduler's to preserve Python and it's modules versions.
    • Use folowing Python code inside of a container:
    #!/usr/bin/env python3
    from dask.distributed 
    import Client
    import dask.array as da
    client = Client('tcp://IP_ADDRESS:8786')
    array = da.ones((1000, 1000, 1000))
    mn = array.mean().compute() # Should print 1.0
    print(mn)
    

    Please put in the "IP_ADDRESS" the IP address of your machine that is accessible from your Docker container!

    You should get the following output:

    (base) root@81fb5004ea4c:/# python3 script.py 
    1.0
    

    You could check Python venv for virtual environments.

    Additional reference: