Search code examples
apache-sparkapache-spark-sqlapache-iceberg

How to enable storage partitioned join in spark/iceberg?


How do I use the storage partitioned join feature in Spark 3.3.0? I've tried it out, and my query plan still shows the expensive ColumnarToRow and Exchange steps. My setup is as follows:

  • joining two Iceberg tables, both partitioned on hours(ts), bucket(20, id)
  • join attempted on a.id = b.id AND a.ts = b.ts and on a.id = b.id
  • tables are large, 100+ partitions used, 100+ GB of data to join
  • spark: 3.3.0
  • iceberg: org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:0.14.1
  • set my spark session config with spark.sql.sources.v2.bucketing.enabled=true

I read through all the docs I could find on the storage partitioned join feature:

I'm wondering if there are other things I need to configure, if there needs to be something implemented in Iceberg still, or if I've set up something wrong. I'm super excited about this feature. It could really speed up some of our large joins.


Solution

  • The support hasn't been implemented in Iceberg yet. In fact it looks like the work is proceeding as I'm typing: https://github.com/apache/iceberg/issues/430#issuecomment-1283014666

    This answer should be updated when there's a release of Iceberg that supports Spark storage-partitioned joins.