Search code examples
scalaapache-sparknestedorc

Flattening a nested ORC file with Spark - Performance issue


We are facing a severe performance issue when reading a nested ORC file.

This is our ORC schema:

|-- uploader: string (nullable = true)
|-- email: string (nullable = true)
|-- data: array (nullable = true)
|    |-- element: struct (containsNull = true) 
|    |    |-- startTime: string (nullable = true)
|    |    |-- endTime: string (nullable = true)
|    |    |-- val1: string (nullable = true)
|    |    |-- val2: string (nullable = true)
|    |    |-- val3: integer (nullable = true)
|    |    |-- val4: integer (nullable = true)
|    |    |-- val5: integer (nullable = true)
|    |    |-- val6: integer (nullable = true)

The ‘data’ array could potentially contain 75K objects.

In our spark application, we flatten this ORC, as you can see below:

val dataFrame = spark.read.orc(files: _*)
val withData = dataFrame.withColumn("data", explode(dataFrame.col("data")))
val withUploader = withData.select($"uploader", $"data")
val allData = withUploader
  .withColumn("val_1", $"data.val1")
  .withColumn("val_2", $"data.val2")
  .withColumn("val_3", $"data.val3")
  .withColumn("val_4", $"data.val4")
  .withColumn("val_5", $"data.val5")
  .withColumn("val_6", $"data.val6")
  .withColumn("utc_start_time", timestampUdf($"data.startTime"))
  .withColumn("utc_end_time", timestampUdf($"data.endTime"))

allData.drop("data")

The flattening process seems to be a very heavy operation: Reading a 2MB ORC file with 20 records, each of which contains a data array with 75K objects, results in hours of processing time. Reading the file and collecting it without flattening it, takes 22 seconds.

Is there a way to make spark process the data faster?


Solution

  • In case this helps someone, I found that flattening the data using flatmap is much faster than doing it with explode:

    dataFrame.as[InputFormat].flatMap(r => r.data.map(v => OutputFormat(v, r.tenant)))
    

    The improvement in performance was dramatic.

    Processing a file with 20 records, each containing an array with 250K rows- with the explode implementation it took 8 hours, with the flatmap implementation- 7 minutes (!)