Search code examples
apache-sparkamazon-s3hiveparquetbucket

In Apache Spark's `bucketBy`, how do you generate 1 file per bucket instead of 1 file per bucket per partition?


I am trying to use Spark's bucketBy feature on a pretty large dataset.

dataframe.write()
    .format("parquet")
    .bucketBy(500, bucketColumn1, bucketColumn2)
    .mode(SaveMode.Overwrite)
    .option("path", "s3://my-bucket")
    .saveAsTable("my_table");

The problem is that my Spark cluster has about 500 partitions/tasks/executors (not sure the terminology), so I end up with files that look like:

part-00001-{UUID}_00001.c000.snappy.parquet
part-00001-{UUID}_00002.c000.snappy.parquet
...
part-00001-{UUID}_00500.c000.snappy.parquet

part-00002-{UUID}_00001.c000.snappy.parquet
part-00002-{UUID}_00002.c000.snappy.parquet
...
part-00002-{UUID}_00500.c000.snappy.parquet

part-00500-{UUID}_00001.c000.snappy.parquet
part-00500-{UUID}_00002.c000.snappy.parquet
...
part-00500-{UUID}_00500.c000.snappy.parquet

That's 500x500=250000 bucketed parquet files! It takes forever for the FileOutputCommitter to commit that to S3.

Is there a way to generate one file per bucket, like in Hive? Or is there a better way to deal with this problem? As of now it seems like I have to choose between lowering the parallelism of my cluster (reduce number of writers) or reducing the parallelism of my parquet files (reduce number of buckets).

Thanks


Solution

  • In order to get 1 file per final bucket do the following. Right before writing the dataframe as table repartition it using exactly same columns as ones you are using for bucketing and set the number of new partitions to be equal to number of buckets you will use in bucketBy (or a smaller number which is a divisor of number of buckets, though I don't see a reason to use a smaller number here).

    In your case that would probably look like this:

    dataframe.repartition(500, bucketColumn1, bucketColumn2)
        .write()
        .format("parquet")
        .bucketBy(500, bucketColumn1, bucketColumn2)
        .mode(SaveMode.Overwrite)
        .option("path", "s3://my-bucket")
        .saveAsTable("my_table");
    

    In the cases when you're saving to an existing table you need to make sure the types of columns are matching exactly (e.g. if your column X is INT in dataframe, but BIGINT in the table you're inserting into your repartitioning by X into 500 buckets won't match repartitioning by X treated as BIGINT and you'll end up with each of 500 executors writing 500 files again).

    Just to be 100% clear - this repartitioning will add another step into your execution which is to gather the data for each bucket on 1 executor (so one full data reshuffle if the data was not partitioned same way before). I'm assuming that is exactly what you want.

    It was also mentioned in comments to another answer that you'll need to be prepared for possible issues if your bucketing keys are skewed. It is true, but default Spark behavior doesn't exactly help you much if the first thing you do after loading the table is to aggregate/join on the same columns you bucketed by (which seems like a very possible scenario for someone who chose to bucket by these columns). Instead you will get a delayed issue and only see the skewness when try to load the data after the writing.

    In my opinion it would be really nice if Spark offered a setting to always repartition your data before writing a bucketed table (especially when inserting into existing tables).