Search code examples
apache-sparkapache-spark-sqlparallel-processingorc

Splitting spark data into partitions and writing those partitions to disk in parallel


Problem outline: Say I have 300+ GB of data being processed with spark on an EMR cluster in AWS. This data has three attributes used to partition on the filesystem for use in Hive: date, hour, and (let's say) anotherAttr. I want to write this data to a fs in such a way that minimizes the number of files written.

What I'm doing right now is getting the distinct combinations of date, hour, anotherAttr, and a count of how many rows make up combination. I collect them into a List on the driver, and iterate over the list, building a new DataFrame for each combination, repartitioning that DataFrame using the number of rows to guestimate file size, and writing the files to disk with DataFrameWriter, .orc finishing it off.

We aren't using Parquet for organizational reasons.

This method works reasonably well, and solves the problem that downstream teams using Hive instead of Spark don't see performance issues resulting from a high number of files. For example, if I take the whole 300 GB DataFrame, do a repartition with 1000 partitions (in spark) and the relevant columns, and dumped it to disk, it all dumps in parallel, and finishes in ~9 min with the whole thing. But that gets up to 1000 files for the larger partitions, and that destroys Hive performance. Or it destroys some kind of performance, honestly not 100% sure what. I've just been asked to keep the file count as low as possible. With the method I'm using, I can keep the files to whatever size I want (relatively close anyway), but there is no parallelism and it takes ~45 min to run, mostly waiting on file writes.

It seems to me that since there's a 1-to-1 relationship between some source row and some destination row, and that since I can organize the data into non-overlapping "folders" (partitions for Hive), I should be able to organize my code/DataFrames in such a way that I can ask spark to write all the destination files in parallel. Does anyone have suggestions for how to attack this?

Things I've tested that did not work:

  1. Using a scala parallel collection to kick off the writes. Whatever spark was doing with the DataFrames, it didn't separate out the tasks very well and some machines were getting massive garbage collection problems.

  2. DataFrame.map - I tried to map across a DataFrame of the unique combinations, and kickoff writes from inside there, but there's no access to the DataFrame of the data that I actually need from within that map - the DataFrame reference is null on the executor.

  3. DataFrame.mapPartitions - a non-starter, couldn't come up with any ideas for doing what I want from inside mapPartitions

The word 'partition' is also not especially helpful here because it refers both to the concept of spark splitting up the data by some criteria, and to the way that the data will be organized on disk for Hive. I think I was pretty clear in the usages above. So if I'm imagining a perfect solution to this problem, it's that I can create one DataFrame that has 1000 partitions based on the three attributes for fast querying, then from that create another collection of DataFrames, each one having exactly one unique combination of those attributes, repartitioned (in spark, but for Hive) with the number of partitions appropriate to the size of the data it contains. Most of the DataFrames will have 1 partition, a few will have up to 10. The files should be ~3 GB, and our EMR cluster has more RAM than that for each executor, so we shouldn't see a performance hit from these "large" partitions.

Once that list of DataFrames is created and each one is repartitioned, I could ask spark to write them all to disk in parallel.

Is something like this possible in spark?

One thing I'm conceptually unclear on: say I have

val x = spark.sql("select * from source")

and

val y = x.where(s"date=$date and hour=$hour and anotherAttr=$anotherAttr")

and

val z = x.where(s"date=$date and hour=$hour and anotherAttr=$anotherAttr2")

To what extent is y is a different DataFrame than z? If I repartition y, what effect does the shuffle have on z, and on x for that matter?


Solution

  • We had the same problem (almost) and we ended up by working directly with RDD (instead of DataFrames) and implementing our own partitioning mechanism (by extending org.apache.spark.Partitioner)

    Details: we are reading JSON messages from Kafka. The JSON should be grouped by customerid/date/more fields and written in Hadoop using Parquet format, without creating too many small files.

    The steps are (simplified version): a)Read the messages from Kafka and transform them to a structure of RDD[(GroupBy, Message)]. GroupBy is a case class containing all the fields that are used for grouping.

    b)Use a reduceByKeyLocally transformation and obtain a map of metrics (no of messages/messages size/etc) for each group - eg Map[GroupBy, GroupByMetrics]

    c)Create a GroupPartitioner that's using the previously collected metrics (and some input parameters like the desired Parquet size etc) to compute how many partitions should be created for each GroupBy object. Basically we are extending org.apache.spark.Partitioner and overriding numPartitions and getPartition(key: Any)

    d)we partition the RDD from a) using the previously defined partitioner: newPartitionedRdd = rdd.partitionBy(ourCustomGroupByPartitioner)

    e)Invoke spark.sparkContext.runJob with two parameters: the first one is the RDD partitioned at d), the second one is a custom function (func: (TaskContext, Iterator[T]) that will write the messages taken from Iterator[T] into Hadoop/Parquet

    Let's say that we have 100 mil messages, grouped like that

    Group1 - 2 mil

    Group2 - 80 mil

    Group3 - 18 mil and we decided that we have to use 1.5 mil messages per partition to obtain Parquet files greater than 500MB. We'll end up with 2 partitions for Group1, 54 for Group2, 12 for Group3.