Search code examples
scalaapache-spark

Memory issues running spark locally in Intellij (scala)


I'm very new to Scala and Spark. I've been trying to accomplish a script that reads several of the same format excel files (separated by year: e.g. 2011.xlsx, 2012.xlsx, etc) into one dataframe. The total amount of data to be read into the dataframe is a peace-meal 350mb. Each file is approximately 30mb and there are roughly 12 files. However, I keep running to java.lang.OutofMemoryErrors like below:

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "RemoteBlock-temp-file-clean-thread"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Spark Context Cleaner"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Executor task launch worker for task 0.0 in stage 0.0 (TID 0)"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "executor-kill-mark-cleanup"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Executor task launch worker for task 8.0 in stage 0.0 (TID 8)"
java.lang.OutOfMemoryError: Java heap space

I am running this code locally using IntellijIDEA:

import com.crealytics.spark.excel._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.{DataFrame, SparkSession, types}

import java.io.File

object sparkJob extends App {

  val session = SparkSession.builder().
    config("spark.driver.bindAddress", "127.0.0.1").
    config("spark.executor.memory", "8g").
    config("spark.driver.memory", "8g").
    config("spark.memory.offHeap.enabled", true).
    config("spark.memory.offHeap.size", "4g").
    master("local[*]").
    appName("etl").
    getOrCreate()

  val dataSchema = types.StructType(Array(
    StructField("Delivery Date", types.StringType, nullable = false),
    StructField("Delivery Hour", types.IntegerType, nullable = false),
    StructField("Delivery Interval", types.IntegerType, nullable = false),
    StructField("Repeated Hour Flag", types.StringType, nullable = false),
    StructField("Settlement Point Name", types.StringType, nullable = false),
    StructField("Settlement Point Type", types.StringType, nullable = false),
    StructField("Settlement Point Price", types.DecimalType(10, 0), nullable = false)
  ))

  val dir = new File("data/")
  val files = dir.listFiles.map(_.getPath).toList


  def read_excel(filePath: String): DataFrame = {
    session.read.excel(header=true). 
      schema(dataSchema).
      load(filePath)
  }

  val df = files.map(f => read_excel(f))
  val mdf = df.reduce(_.union(_))

  mdf.show(5)
}

Things I've tried:

VM Options: -Xmx -Xms, and expanding various memory types inside the code's spark session config. My machine has 32gb of RAM, so that isn't an issue.


Solution

  • Use parallelize instead of map to read files in parallel. This way Spark will distribute jobs among cluster nodes and use parallel processing to improve performance. For example, you can create an RDD from the list of files and then use map on the RDD:

    val filesRDD = session.sparkContext.parallelize(files)
    val df = filesRDD.map(f => read_excel(f))
    

    Use cache to store the DataFrame. This way, the data will be cached and will not have to be read from disk every time an action is performed on it:

    val mdf = df.reduce(_.union(_)).cache()
    

    the last attempt you can try to do is to set: spark.executor.memory=12g, but I think it is an extreme solution, it might be interesting to debug the excel decoding library to see if the high memory usage is given by it .