Search code examples
apache-sparkpysparkdatabricksdelta-lake

Different number of partitions after spark.read & filter depending on Databricks runtime


I have parquet files saved in the following delta lake format:

data/
├─ _delta_log/
├─ Year=2018/
│  ├─ Month=1/
│  │  ├─ Day=1/
│  │  │  ├─ part-01797-bd37cd91-cea6-421c-b9bb-0578796bc909.c000.snappy.parquet
│  │  ├─ ...
│  │  ├─ Day=31/
│  │  │  ├─ part-01269-3b9691cf-311a-4d26-8348-c0ced0e06bf0.c000.snappy.parquet
│  ├─ Month=2/
├─ Year=2019/

When I read all data; filter for a given month; and show number of partitions:

df=spark.read.format('delta').load("/data")
df=df.filter((F.col("Year")==2018) & (F.col("Month")==1))
df.rdd.getNumPartitions()

I get different results depending on which Databricks Runtime I use:

  • DBR 10.4: 15 partitions
  • DBR 12.2: 10 partitions

The total number of parquet files for January 2018 in file storage is 31 (1 per day as the above ascii tree would indicate)

My questions are:

  1. Why in both cases is the #partitions being reduced from 31 to 15 or 10 after filtering?
  2. Why the difference between DBR 10.4 and DBR 12.2? Since there is a TimeStamp column in the DataFrame, could it have something to do with Ingestion Time Clustering which was released in DBR 11.2?

Solution

    1. Partitions are not tied to the number of records in your DataFrame.

    Link: https://sparkbyexamples.com/spark/spark-partitioning-understanding/

    Spark by default partitions data based on a number of factors, and the factors differ were you running your job on and what mode.

    1. Spark tries to balance utilizing the entire cluster while reducing the amount of disk shuffling (takes a lot of networking compute).

    Link: https://medium.com/@dipayandev/everything-you-need-to-understand-data-partitioning-in-spark-487d4be63b9c