I am writing a batch processing program in Spark using pyspark. Following are the input files and their sizes
base-track.dat (3.9g)
base-attribute-link.dat (18g)
base-release.dat (543m)
These are text files with one record per line and each field is separated by a special character (refer code)
I am performing some filtering operations on attribute link and grouping them and joining with other tables.
I am submitting this program via spark-submit to a Hadoop cluster with 9 data nodes managed by Ambari. Each data node contains 140 GB of RAM and 3.5 TB of disk space.
Following is my pyspark code
import sys
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
if __name__ == "__main__":
sc = SparkContext(appName = "Tracks")
sqlContext = SQLContext(sc)
#Load base-track
track = sc.textFile("base-track/input").map(lambda row: row.split(u'\u0001'))
#Load base-attribute-link
attlnk = sc.textFile("base-attribute-link/input").map(lambda row: row.split(u'\u0001'))
#Load base-release
release = sc.textFile("base-release/input").map(lambda row: row.split(u'\u0001'))
attlnk = attlnk.filter(lambda row: row[2] == 'MA0000000162')
attlnkg = attlnk.groupBy(lambda row: row[1])
attlnkmax = attlnkg.map( lambda t: (t[0],max([v[4] for v in t[1]])) )
alg = attlnkmax.map(lambda r: Row(al_objectid=r[0],al_value=r[1]))
aldf = alg.toDF()
track = track.map(lambda r:Row(t_tag = r[0], t_trackid= r[1], t_releaseid= r[2], t_songid = r[3], t_med= r[4], t_ph = r[5], t_tn = r[5], t_title= r[5], t_part= r[6], t_dur = r[7], t_pick = r[8], t_amgclid = r[9], t_amgpopid = r[10], t_compid = r[11], t_muzid = r[12], t_perfid= r[13], t_albumid = r[14]))
trackdf = track.toDF()
release = release.map(lambda r:Row(r_tag = r[0], r_relid = r[1], r_albumid = r[2], r_mediafmtid = r[3], r_prodfmtid = r[4], r_reldate = r[5], r_prodcode = r[6], r_prodtypeid = r[7], r_label = r[8], r_relyear = r[9], r_ispurch = r[10], r_amgclassid = r[11], r_amgpopid = r[12], r_eanid = r[13], r_upcid = r[14]))
releasedf = release.toDF()
trackaldf = trackdf.join(aldf, trackdf['t_trackid'] == aldf['al_objectid'], 'left_outer')
tracksdf = trackaldf.join(releasedf, trackaldf['t_releaseid'] == releasedf['r_relid'])
tracksdf = tracksdf.select('t_trackid', 't_releaseid', 't_songid', 't_med', 't_ph', 't_tn', 't_title', 't_part', 't_dur', 't_pick', 't_amgclid', 't_amgpopid', 't_compid', 't_muzid', 'al_objectid', 't_perfid', 't_albumid', 'r_label')
tracksdf.rdd.map(lambda x: u"\u0001".join(map(str, x))).coalesce(100).saveAsTextFile("tracks-out")
Got a bunch of the following errors when it is trying to execute this.
ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-d88c631e-cec3-4b83-8af6-a38b109b5e3b/0e/temp_shuffle_7dbda3ac-48b1-4c4a-89c7-64eb5d858d90
java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:336)
at org.apache.spark.io.SnappyOutputStreamWrapper.flush(CompressionCodec.scala:209)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.flush(UnsafeRowSerializer.scala:83)
at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$1.apply$mcV$sp(DiskBlockObjectWriter.scala:157)
at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$1.apply(DiskBlockObjectWriter.scala:154)
at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$1.apply(DiskBlockObjectWriter.scala:154)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1239)
at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:161)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:232)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
There are a couple of questions on SO, here and here related to the same problem.
Here is what I have tried from the above two questions. I tried to increase the spark.yarn.executor.memoryOverhead to 4GB from 384 MB.
SPARK_JAVA_OPTS+=" -Dspark.local.dir=/mnt/spark,/mnt2/spark -Dhadoop.tmp.dir=/mnt/ephemeral-hdfs"
export SPARK_JAVA_OPTS
First one did not have any effect. I got the error that /mnt directories are not present if I add the java opts.
After reading about this problem on multiple forums (including databricks) got some vague idea that this job is trying to create temporary files as part of the shuffle on /tmp of each cluster node and exhausting the space. On each cluster node, we have allocated 100 GB for root (/) partition on which tmp directory is present.
I have been struggling for more than a month to get this executed by playing with various spark configuration parameters. As part of tweaking, I increased spark.driver and spark.executor memory to 16g and later to 64g. Also increased spark yarn executor memory to 4GB. Unfortunately none of this could solve the space issue.
Any guidance on how to proceed further would be of great help.
[Edit-1] I was just checking the disk space of root directories on all the machines, 7 of 9 nodes in our cluster has 100+GB allocated for root directories, but on 2 nodes only 10 GB is allocated, only 6+GB is left on them. This might be causing the disk space issue, I will have to check with our IT team if the size of the root directory can be extended.
[Edit-2] I worked with the IT team to extend the root partition size to 100+GB on all the machines, but the issue still persist, may be 100GB of /tmp space is also not sufficient for this job. I estimated the output of this job to be roughly 4.6GB.
I figured out that I am not submitting the spark job to the cluster but a single machine and hence the disk space issues. I was always submitting my script in the following way
spark-submit tracks.py
Since I want my script to be executed on the Hadoop cluster and use Yarn as resource manager, I changed my submit command to the following, then it worked fine.
spark-submit --master yarn --deploy-mode cluster tracks.py