I'm having a strange issue with spark submit processes hanging indefinitely & leaking memory after the job had finished.
I keep having exactly 3 spark submit processes hanging from the first 3 jobs that were submitted to the cluster using client mode. Example from the client:
root 1517 0.3 4.7 8412728 1532876 ? Sl 18:49 0:38 /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /usr/local/spark/conf/:/usr/local/spark/jars/*:/usr/local/hadoop-2.7.7/etc/hadoop/:/usr/local/hadoop-2.7.7/share/hadoop/common/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/common/*:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/*:/usr/local/hadoop-2.7.7/share/hadoop/yarn/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/yarn/*:/usr/local/hadoop-2.7.7/share/hadoop/mapreduce/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/mapreduce/*:/usr/local/hadoop/contrib/capacity-scheduler/*.jar -Xmx2g org.apache.spark.deploy.SparkSubmit --conf spark.driver.port=46101 --conf spark.master=spark://3c520b0c6d6e:7077 --conf spark.scheduler.allocation.file=/home/jovyan/work/spark_scheduler_allocation.xml --conf spark.app.name=REDACTED --conf spark.driver.bindAddress=3c520b0c6d6e --conf spark.fileserver.port=46102 --conf packages=org.apache.kudu:kudu-spark2_2.11:1.12.0 --conf spark.broadcast.port=46103 --conf spark.driver.host=3c520b0c6d6e --conf spark.replClassServer.port=46104 --conf spark.executorEnv.AF_ALERTS_STREAM_KEY=ALERTS_STREAM_LIST --conf spark.scheduler.mode=FAIR --conf spark.shuffle.service.enabled=True --conf spark.blockManager.port=46105 --conf spark.dynamicAllocation.enabled=true pyspark-shell
root 1746 0.4 3.5 8152640 1132420 ? Sl 18:59 0:36 /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /usr/local/spark/conf/:/usr/local/spark/jars/*:/usr/local/hadoop-2.7.7/etc/hadoop/:/usr/local/hadoop-2.7.7/share/hadoop/common/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/common/*:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/*:/usr/local/hadoop-2.7.7/share/hadoop/yarn/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/yarn/*:/usr/local/hadoop-2.7.7/share/hadoop/mapreduce/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/mapreduce/*:/usr/local/hadoop/contrib/capacity-scheduler/*.jar -Xmx2g org.apache.spark.deploy.SparkSubmit --conf spark.driver.port=46101 --conf spark.master=spark://3c520b0c6d6e:7077 --conf spark.scheduler.allocation.file=/home/jovyan/work/spark_scheduler_allocation.xml --conf spark.app.name=REDACTED --conf spark.driver.bindAddress=3c520b0c6d6e --conf spark.fileserver.port=46102 --conf packages=org.apache.kudu:kudu-spark2_2.11:1.12.0 --conf spark.broadcast.port=46103 --conf spark.driver.host=3c520b0c6d6e --conf spark.replClassServer.port=46104 --conf spark.executorEnv.AF_ALERTS_STREAM_KEY=ALERTS_STREAM_LIST --conf spark.scheduler.mode=FAIR --conf spark.shuffle.service.enabled=True --conf spark.blockManager.port=46105 --conf spark.dynamicAllocation.enabled=true pyspark-shell
root 2239 65.3 7.8 9743456 2527236 ? Sl 19:10 91:30 /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /usr/local/spark/conf/:/usr/local/spark/jars/*:/usr/local/hadoop-2.7.7/etc/hadoop/:/usr/local/hadoop-2.7.7/share/hadoop/common/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/common/*:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/hdfs/*:/usr/local/hadoop-2.7.7/share/hadoop/yarn/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/yarn/*:/usr/local/hadoop-2.7.7/share/hadoop/mapreduce/lib/*:/usr/local/hadoop-2.7.7/share/hadoop/mapreduce/*:/usr/local/hadoop/contrib/capacity-scheduler/*.jar -Xmx2g org.apache.spark.deploy.SparkSubmit --conf spark.driver.port=46101 --conf spark.master=spark://3c520b0c6d6e:7077 --conf spark.scheduler.allocation.file=/home/jovyan/work/spark_scheduler_allocation.xml --conf spark.app.name=REDACTED --conf spark.driver.bindAddress=3c520b0c6d6e --conf spark.fileserver.port=46102 --conf packages=org.apache.kudu:kudu-spark2_2.11:1.12.0 --conf spark.broadcast.port=46103 --conf spark.driver.host=3c520b0c6d6e --conf spark.replClassServer.port=46104 --conf spark.executorEnv.AF_ALERTS_STREAM_KEY=ALERTS_STREAM_LIST --conf spark.scheduler.mode=FAIR --conf spark.shuffle.service.enabled=True --conf spark.blockManager.port=46105 --conf spark.dynamicAllocation.enabled=true pyspark-shell
The corresponding jobs are showing as 'completed' in spark UI and have closed their sessions & exited according to their logs. No worker resources are being consumed by these jobs anymore & subsequent jobs are able to receive maximum executors and run as expected. However, these 3 processes keep consuming memory in a slowly growing rate, which eventually leads to an OOM when trying to allocate a new driver.
The thread list of process 1517 above is showing the following user threads (daemon threads omitted):
"Thread-4" #16 prio=5 os_prio=0 tid=0x00007f8fe4008000 nid=0x61c runnable [0x00007f9029227000]
java.lang.Thread.State: RUNNABLE
`at java.net.SocketInputStream.socketRead0(Native Method)`
`at java.net.SocketInputStream.socketRead(`[`SocketInputStream.java:116`](https://SocketInputStream.java:116)`)`
`at` [`java.net.SocketInputStream.read`](https://java.net.SocketInputStream.read)`(`[`SocketInputStream.java:171`](https://SocketInputStream.java:171)`)`
`at` [`java.net.SocketInputStream.read`](https://java.net.SocketInputStream.read)`(`[`SocketInputStream.java:141`](https://SocketInputStream.java:141)`)`
`at sun.nio.cs.StreamDecoder.readBytes(`[`StreamDecoder.java:284`](https://StreamDecoder.java:284)`)`
`at sun.nio.cs.StreamDecoder.implRead(`[`StreamDecoder.java:326`](https://StreamDecoder.java:326)`)`
`at` [`sun.nio.cs.StreamDecoder.read`](https://sun.nio.cs.StreamDecoder.read)`(`[`StreamDecoder.java:178`](https://StreamDecoder.java:178)`)`
`- locked <0x00000000800f8a88> (a java.io.InputStreamReader)`
`at` [`java.io.InputStreamReader.read`](https://java.io.InputStreamReader.read)`(`[`InputStreamReader.java:184`](https://InputStreamReader.java:184)`)`
`at java.io.BufferedReader.fill(`[`BufferedReader.java:161`](https://BufferedReader.java:161)`)`
`at java.io.BufferedReader.readLine(`[`BufferedReader.java:324`](https://BufferedReader.java:324)`)`
`- locked <0x00000000800f8a88> (a java.io.InputStreamReader)`
`at java.io.BufferedReader.readLine(`[`BufferedReader.java:389`](https://BufferedReader.java:389)`)`
`at` [`py4j.GatewayConnection.run`](https://py4j.GatewayConnection.run)`(`[`GatewayConnection.java:230`](https://GatewayConnection.java:230)`)`
`at` [`java.lang.Thread.run`](https://java.lang.Thread.run)`(`[`Thread.java:748`](https://Thread.java:748)`)`
Locked ownable synchronizers:
`- None`
"Thread-3" #15 prio=5 os_prio=0 tid=0x00007f905dab7000 nid=0x61b runnable [0x00007f9029328000]
java.lang.Thread.State: RUNNABLE
`at java.net.PlainSocketImpl.socketAccept(Native Method)`
`at java.net.AbstractPlainSocketImpl.accept(`[`AbstractPlainSocketImpl.java:409`](https://AbstractPlainSocketImpl.java:409)`)`
`at java.net.ServerSocket.implAccept(`[`ServerSocket.java:560`](https://ServerSocket.java:560)`)`
`at java.net.ServerSocket.accept(`[`ServerSocket.java:528`](https://ServerSocket.java:528)`)`
`at` [`py4j.GatewayServer.run`](https://py4j.GatewayServer.run)`(`[`GatewayServer.java:685`](https://GatewayServer.java:685)`)`
`at` [`java.lang.Thread.run`](https://java.lang.Thread.run)`(`[`Thread.java:748`](https://Thread.java:748)`)`
Locked ownable synchronizers:
`- None`
"pool-1-thread-1" #14 prio=5 os_prio=0 tid=0x00007f905daa5000 nid=0x61a waiting on condition [0x00007f902982c000]
java.lang.Thread.State: TIMED_WAITING (parking)
`at sun.misc.Unsafe.park(Native Method)`
`- parking to wait for <0x000000008011cda8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)`
`at java.util.concurrent.locks.LockSupport.parkNanos(`[`LockSupport.java:215`](https://LockSupport.java:215)`)`
`at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(`[`AbstractQueuedSynchronizer.java:2078`](https://AbstractQueuedSynchronizer.java:2078)`)`
`at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(`[`ScheduledThreadPoolExecutor.java:1093`](https://ScheduledThreadPoolExecutor.java:1093)`)`
`at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(`[`ScheduledThreadPoolExecutor.java:809`](https://ScheduledThreadPoolExecutor.java:809)`)`
`at java.util.concurrent.ThreadPoolExecutor.getTask(`[`ThreadPoolExecutor.java:1074`](https://ThreadPoolExecutor.java:1074)`)`
`at java.util.concurrent.ThreadPoolExecutor.runWorker(`[`ThreadPoolExecutor.java:1134`](https://ThreadPoolExecutor.java:1134)`)`
`at` [`java.util.concurrent.ThreadPoolExecutor$Worker.run`](https://java.util.concurrent.ThreadPoolExecutor$Worker.run)`(`[`ThreadPoolExecutor.java:624`](https://ThreadPoolExecutor.java:624)`)`
`at` [`java.lang.Thread.run`](https://java.lang.Thread.run)`(`[`Thread.java:748`](https://Thread.java:748)`)`
Locked ownable synchronizers:
`- None`
"main" #1 prio=5 os_prio=0 tid=0x00007f905c016800 nid=0x604 runnable [0x00007f9062b96000]
java.lang.Thread.State: RUNNABLE
`at java.io.FileInputStream.readBytes(Native Method)`
`at` [`java.io.FileInputStream.read`](https://java.io.FileInputStream.read)`(`[`FileInputStream.java:255`](https://FileInputStream.java:255)`)`
`at java.io.BufferedInputStream.fill(`[`BufferedInputStream.java:246`](https://BufferedInputStream.java:246)`)`
`at` [`java.io.BufferedInputStream.read`](https://java.io.BufferedInputStream.read)`(`[`BufferedInputStream.java:265`](https://BufferedInputStream.java:265)`)`
`- locked <0x0000000080189dc8> (a java.io.BufferedInputStream)`
`at org.apache.spark.api.python.PythonGatewayServer$.main(PythonGatewayServer.scala:87)`
`at org.apache.spark.api.python.PythonGatewayServer.main(PythonGatewayServer.scala)`
`at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)`
`at sun.reflect.NativeMethodAccessorImpl.invoke(`[`NativeMethodAccessorImpl.java:62`](https://NativeMethodAccessorImpl.java:62)`)`
`at sun.reflect.DelegatingMethodAccessorImpl.invoke(`[`DelegatingMethodAccessorImpl.java:43`](https://DelegatingMethodAccessorImpl.java:43)`)`
`at java.lang.reflect.Method.invoke(`[`Method.java:498`](https://Method.java:498)`)`
`at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)`
`at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)`
`at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)`
`at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)`
`at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)`
`at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)`
`at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)`
`at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)`
Locked ownable synchronizers:
`- None`
"VM Thread" os_prio=0 tid=0x00007f905c08c000 nid=0x60d runnable
"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f905c02b800 nid=0x605 runnable
"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f905c02d000 nid=0x606 runnable
"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007f905c02f000 nid=0x607 runnable
"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007f905c030800 nid=0x608 runnable
"GC task thread#4 (ParallelGC)" os_prio=0 tid=0x00007f905c032800 nid=0x609 runnable
"GC task thread#5 (ParallelGC)" os_prio=0 tid=0x00007f905c034000 nid=0x60a runnable
"GC task thread#6 (ParallelGC)" os_prio=0 tid=0x00007f905c036000 nid=0x60b runnable
"GC task thread#7 (ParallelGC)" os_prio=0 tid=0x00007f905c037800 nid=0x60c runnable
"VM Periodic Task Thread" os_prio=0 tid=0x00007f905c0e0800 nid=0x616 waiting on condition
I am noticing that the main thread is blocking on a file input stream read coming from the PythonGatewayServer & the rest of the threads seem to be blocked waiting for a socket read. Seems like some arbitrary number of python gateways are being retained for some reason.
Any ideas on what's the cause?
As @mazaneicha noticed, the spark-submit processes are running inside a pyspark REPL shell. This is because the jobs are submitted from python runtime and not using proper spark-submit script.
For example, consider this class:
class BatchTask:
def start_job(self):
try:
spark = SparkSession.builder.appName('app_name').getOrCreate()
dataframe = spark.read \
.format('some.format') \
.load()
... perform dataframe aggregations ...
dataframe.collect()
finally:
if spark:
spark.stop()
As it turns out, if you instantiate BatchTask
and call start_job
in any python runtime on a machine with pyspark installed & on PYTHONPATH, a spark-submit process will run inside a pyspark REPL & execute the application driver. This REPL will not exit & will be reused for subsequent jobs that are submitted this way, accumulating all daemon threads that are added with each job, eventually exhausting the driver's memory limit and crashing.
Solution - don't submit spark application this way, instead use spark-submit script.