Search code examples
pythonkubernetespyspark

Running PySpark job on Kubernetes spark cluster


I am trying to run a Spark job on a separate master Spark server hosted on kubernetes but port forwarding reports the following error:

E0206 19:52:24.846137   14968 portforward.go:400] an error occurred forwarding 7077 -> 7077: error forwarding port 7077 to pod 1cf922cbe9fc820ea861077c030a323f6dffd4b33bb0c354431b4df64e0db413, uid : exit status 1: 2022/02/07 00:52:26 socat[25402] E connect(16, AF=2 127.0.0.1:7077, 16): Connection refused

My setup is:

  • I am using VS Code with a dev container to manage a setup where I can run Spark applications. I can run local spark jobs when I build my context like so : sc = pyspark.SparkContext(appName="Pi")
  • My host computer is running Docker Desktop where I have kubernetes running and used Helm to run the Spark release from Bitnami https://artifacthub.io/packages/helm/bitnami/spark
  • The VS Code dev container can access the host correctly since I can do curl host.docker.internal:80 and I get the Spark web UI status page. The port 80 is forwarded from the host using kubectl port-forward --namespace default --address 0.0.0.0 svc/my-release-spark-master-svc 80:80
  • I am also forwarding the port 7077 using a similar command kubectl port-forward --address 0.0.0.0 svc/my-release-spark-master-svc 7077:7077.

When I create a Spark context like this sc = pyspark.SparkContext(appName="Pi", master="spark://host.docker.internal:7077") I am expecting Spark to submit jobs to that master. I don't know much about Spark but I have seen a few examples creating a context like this.

When I run the code, I see connections attempts failing at port 7077 of kubernetes port forwarding, so the requests are going through but they are being refused somehow.

Handling connection for 7077
E0206 19:52:24.846137   14968 portforward.go:400] an error occurred forwarding 7077 -> 7077: error forwarding port 7077 to pod 1cf922cbe9fc820ea861077c030a323f6dffd4b33bb0c354431b4df64e0db413, uid : exit status 1: 2022/02/07 00:52:26 socat[25402] E connect(16, AF=2 127.0.0.1:7077, 16): Connection refused

Now, I have no idea why the connections are being refused. I know the Spark server is accepting requests because I can see the Web UI from within the docker dev container. I know that the Spark service is exposing port 7077 because I can do:

$ kubectl get services
NAME                          TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)           AGE
kubernetes                    ClusterIP   10.96.0.1        <none>        443/TCP           28h
my-release-spark-headless     ClusterIP   None             <none>        <none>            7h40m
my-release-spark-master-svc   ClusterIP   10.108.109.228   <none>        7077/TCP,80/TCP   7h40m

Can anyone tell why the connections are refused and how I can successfully configure the Spark master to accept jobs from external callers ?

Example code I am using is:

import findspark
findspark.init()

import pyspark
import random

#sc = pyspark.SparkContext(appName="Pi", master="spark://host.docker.internal:7077")
sc = pyspark.SparkContext(appName="Pi")
num_samples = 100000000

def inside(p):
  x, y = random.random(), random.random()
  return x*x + y*y < 1

count = sc.parallelize(range(0, num_samples)).filter(inside).count()

pi = 4 * count / num_samples
print(pi)

sc.stop()

Solution

  • After tinkering with it a bit more, I noticed this output when launching the helm chart for Apache Spark ** IMPORTANT: When submit an application from outside the cluster service type should be set to the NodePort or LoadBalancer. **.

    This led me to research a bit more into Kubernetes networking. To submit a job, it is not sufficient to forward port 7077. Instead, the cluster itself needs to have an IP assigned. This requires the helm chart to be launched with the following commands to set Spark config values helm install my-release --set service.type=LoadBalancer --set service.loadBalancerIP=192.168.2.50 bitnami/spark. My host IP address is above and will be reachable by the Docker container.

    With the LoadBalancer IP assigned, Spark will run using the example code provided.

    Recap: Don't use port forwarding to submit jobs, a Cluster IP needs to be assigned.