We are facing a severe performance issue when reading a nested ORC file.
This is our ORC schema:
|-- uploader: string (nullable = true)
|-- email: string (nullable = true)
|-- data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- startTime: string (nullable = true)
| | |-- endTime: string (nullable = true)
| | |-- val1: string (nullable = true)
| | |-- val2: string (nullable = true)
| | |-- val3: integer (nullable = true)
| | |-- val4: integer (nullable = true)
| | |-- val5: integer (nullable = true)
| | |-- val6: integer (nullable = true)
The ‘data’ array could potentially contain 75K objects.
In our spark application, we flatten this ORC, as you can see below:
val dataFrame = spark.read.orc(files: _*)
val withData = dataFrame.withColumn("data", explode(dataFrame.col("data")))
val withUploader = withData.select($"uploader", $"data")
val allData = withUploader
.withColumn("val_1", $"data.val1")
.withColumn("val_2", $"data.val2")
.withColumn("val_3", $"data.val3")
.withColumn("val_4", $"data.val4")
.withColumn("val_5", $"data.val5")
.withColumn("val_6", $"data.val6")
.withColumn("utc_start_time", timestampUdf($"data.startTime"))
.withColumn("utc_end_time", timestampUdf($"data.endTime"))
allData.drop("data")
The flattening process seems to be a very heavy operation: Reading a 2MB ORC file with 20 records, each of which contains a data array with 75K objects, results in hours of processing time. Reading the file and collecting it without flattening it, takes 22 seconds.
Is there a way to make spark process the data faster?
In case this helps someone, I found that flattening the data using flatmap is much faster than doing it with explode:
dataFrame.as[InputFormat].flatMap(r => r.data.map(v => OutputFormat(v, r.tenant)))
The improvement in performance was dramatic.
Processing a file with 20 records, each containing an array with 250K rows- with the explode implementation it took 8 hours, with the flatmap implementation- 7 minutes (!)