Search code examples
apache-sparkdataframehiveapache-spark-sqlrdd

How to optimize Spark Job processing S3 files into Hive Parquet Table


I am new to Spark distributed development. I'm attempting to optimize my existing Spark job which takes up to 1 hour to complete.

Infrastructure:

  • EMR [10 instances of r4.8xlarge (32 cores, 244GB)]
  • Source Data: 1000 .gz files in S3 (~30MB each)
  • Spark Execution Parameters [Executors: 300, Executor Memory: 6gb, Cores: 1]

In general, the Spark job performs the following:

private def processLines(lines: RDD[String]): DataFrame = {
    val updatedLines = lines.mapPartitions(row => ...)
    spark.createDataFrame(updatedLines, schema)
}

// Read S3 files and repartition() and cache()
val lines: RDD[String] = spark.sparkContext
    .textFile(pathToFiles, numFiles) 
    .repartition(2 * numFiles) // double the parallelism
    .cache()

val numRawLines = lines.count()

// Custom process each line and cache table
val convertedLines: DataFrame = processLines(lines)
convertedRows.createOrReplaceTempView("temp_tbl")
spark.sqlContext.cacheTable("temp_tbl")
val numRows = spark.sql("select count(*) from temp_tbl").collect().head().getLong(0)

// Select a subset of the data
val myDataFrame = spark.sql("select a, b, c from temp_tbl where field = 'xxx' ")

// Define # of parquet files to write using coalesce
val numParquetFiles = numRows / 1000000
var lessParts = myDataFrame.rdd.coalesce(numParquetFiles)
var lessPartsDataFrame = spark.sqlContext.createDataFrame(lessParts, myDataFrame.schema)
lessPartsDataFrame.createOrReplaceTempView('my_view')

// Insert data from view into Hive parquet table
spark.sql("insert overwrite destination_tbl 
           select * from my_view")    
lines.unpersist()

The app reads all S3 files => repartitions to twice the amount of files => caches the RDD => custom processes each line => creates a temp view/cache table => counts the num rows => selects a subset of the data => decrease the amount of partitions => creates a view of the subset of data => inserts to hive destination table using the view => unpersist the RDD.

I am not sure why it takes a long time to execute. Are the spark execution parameters incorrectly set or is there something being incorrectly invoked here?


Solution

  • Before looking at the metrics, I would try the following change to your code.

    private def processLines(lines: DataFrame): DataFrame = {
      lines.mapPartitions(row => ...)
    }
    
    val convertedLinesDf = spark.read.text(pathToFiles)
        .filter("field = 'xxx'")
        .cache()
    
    val numLines = convertedLinesDf.count() //dataset get in memory here, it takes time        
    // Select a subset of the data, but it will be fast if you have enough memory
    // Just use Dataframe API
    val myDataFrame = convertedLinesDf.transform(processLines).select("a","b","c")
    
    //coalesce here without converting to RDD, experiment what best
    myDataFrame.coalesce(<desired_output_files_number>)
      .write.option(SaveMode.Overwrite)
      .saveAsTable("destination_tbl")
    
    • Caching is useless if you don't count the number of rows. And it will take some memory and add some GC pressure
    • Caching table may consume more memory and add more GC pressure
    • Converting Dataframe to RDD is costly as it implies ser/deser operations
    • Not sure what you trying to do with : val numParquetFiles = numRows / 1000000 and repartition(2 * numFiles). With your setup, 1000 files of 30MB each will give you 1000 partitions. It could be fine like this. Calling repartition and coalesce may trigger a shuffling operation which is costly. (Coalesce may not trigger a shuffle)

    Tell me if you get any improvements !