Search code examples
scalaapache-sparkparquet

In Spark, how do you read parquet files that were written with bucketBy, and preserve the bucketing data?


In Apache Spark 2.4.5, how do you open up a set of parquet files that were written with bucketBy and saveAsTable?

For example:

case class VeryVeryDeeplyNestedThing(
  s: String,
  nested1: OtherVeryDeeplyNestedThing
)
case class OtherVeryDeeplyNestedThing (
  youGetTheIdeaNoOneWantsToHandWriteASqlStatementForThese: NestedMcNesty
)
List(VeryVeryDeeplyNestedThing(...)).toDS()
  .write
  .bucketBy(512, "s")
  .option("path", "/tmp/output")
  .format("parquet")
  .saveAsTable("mytable")

Now there's a set of parquet files in /tmp/output. Move the files in /tmp/output to /tmp/newPlace, and start a completely new spark session.

spark.read.parquet("/tmp/newPlace")
  .whatGoesHere?

What do you need to do to read them back in with the same bucketing information that they were written with? It doesn't seem like that information is baked into the parquet files themselves, or is that what happens?

[Edit: added worked example partially from https://kb.databricks.com/_static/notebooks/data/bucketing-example.html per @thebluephantom I think showing that reading does in fact require something special]

If you create the parquet files like this:

scala> def base = spark.range(1, 160000, 1, 16).select($"id" as "key", rand(12) as "value")
base: org.apache.spark.sql.DataFrame

scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode

scala> base.write.format("parquet").bucketBy(16, "key").sortBy("value").option("path", "/tmp/example").mode(SaveMode.Overwrite).saveAsTable("bucketed")

scala> base.write.format("parquet").option("path", "/tmp/exampleunbucketed").mode(SaveMode.Overwrite).saveAsTable("unbucketed")

scala> val t2 = spark.table("bucketed")
t2: org.apache.spark.sql.DataFrame = [key: bigint, value: double]

scala> val t3 = spark.table("bucketed")
t3: org.apache.spark.sql.DataFrame = [key: bigint, value: double]

// This is joining two bucketed tables

scala> t3.join(t2, Seq("key")).explain()
== Physical Plan ==
*(2) Project [key#51L, value#52, value#58]
+- *(2) BroadcastHashJoin [key#51L], [key#57L], Inner, BuildRight
   :- *(2) Project [key#51L, value#52]
   :  +- *(2) Filter isnotnull(key#51L)
   :     +- *(2) FileScan parquet default.bucketed[key#51L,value#52] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
      +- *(1) Project [key#57L, value#58]
         +- *(1) Filter isnotnull(key#57L)
            +- *(1) FileScan parquet default.bucketed[key#57L,value#58] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16

That has a FileScan parquet default.bucketed on both sides. Now just do a plain read of the parquet files, and explain the join:

scala> val t4 = spark.read.parquet("/tmp/example")
t4: org.apache.spark.sql.DataFrame = [key: bigint, value: double]

scala> t3.join(t4, Seq("key")).explain()
== Physical Plan ==
*(2) Project [key#51L, value#52, value#64]
+- *(2) BroadcastHashJoin [key#51L], [key#63L], Inner, BuildRight
   :- *(2) Project [key#51L, value#52]
   :  +- *(2) Filter isnotnull(key#51L)
   :     +- *(2) FileScan parquet default.bucketed[key#51L,value#52] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
      +- *(1) Project [key#63L, value#64]
         +- *(1) Filter isnotnull(key#63L)
            +- *(1) FileScan parquet [key#63L,value#64] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ip-172-31-66-61.ec2.internal:50070/tmp/example], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>

t4 doesn't have anything indicating that it's bucketed.

Does this matter? Is it still bucketed? Am I misreading the explain output? Or do I have to do something to make sure t4 uses buckets?


Solution

  • You don't. bucketBy is a table-based API, that simple.

    Use bucket by so as to subsequently sort the tables and make subsequent JOINs faster by obviating shuffling. Use, thus for ETL for temporary, intermediate results processing in general.

    Reading requires nothing special is added to the query, but the JOINed tables must BOTH be bucketed and have same number of buckets and partitions. See this excellent post: https://kb.databricks.com/_static/notebooks/data/bucketing-example.html. Also, spark sql shuffle partitions must equal number of buckets.

    UPDATE

    In case of small data the broadcast hash join may occur, so set the following:

    spark.conf.set("spark.sql.sources.bucketing.enabled", true)
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    

    Also, use spark.table I suggest, not spark.read.parquet... bucketBy ony works with table api. See https://blog.taboola.com/bucket-the-shuffle-out-of-here/