Search code examples
apache-sparkpysparkparquet

partitionOverwriteMode dynamic and "logical" partitions


Can dynamic partitionOverwriteMode selectively overwrite parquet files that are not separated into Hive-style partitions (i.e. different directories)? Using row group filtering in parquet, it is possible to selectively ignore parquet files for I/O that could not possibly contain data you are filtering on. I am wondering if partitionOverwriteMode dynamic can take advantage of similar optimization and selectively distinguish parquet files for overwrite that exist in the same Hive-style partition.

According to documentation, "When in dynamic partition overwrite mode, operations overwrite all existing data in each logical partition for which the write commits new data. Any existing logical partitions for which the write does not contain data remain unchanged." https://docs.databricks.com/en/delta/selective-overwrite.html#dynamic-partition-overwrites

However, I am not sure what logical partition means in this context


Solution

  • I did some testing (results below) to evaluate behavior of dynamic partitionOverwriteMode, as inspired by this blog, and confirmed that documentation is referring to Hive-style partitions only (as created by partitionBy) when selectively overwriting partitions. It does not selectively overwrite individual parquet files based off of any supplemental logical grouping.

    from pyspark.sql import Row
    from pyspark.sql.functions import col
    
    data = [
        Row(order_id=1, order_amount=100, year=2025, month=1, day=2),
        Row(order_id=2, order_amount=150, year=2025, month=1, day=5),
        Row(order_id=3, order_amount=200, year=2025, month=1, day=6),
        Row(order_id=4, order_amount=250, year=2025, month=2, day=6),
        Row(order_id=5, order_amount=300, year=2025, month=2, day=23),
        Row(order_id=6, order_amount=350, year=2024, month=12, day=11)
    ]
    
    df_write_1 = spark.createDataFrame(data)
    display(df_write_1)
    

    Initial DataFrame written

    df_write_1.write.mode("overwrite") \
        .partitionBy("year", "month") \
        .option("partitionOverwriteMode", "dynamic") \
        .option("maxRecordsPerFile", 1) \
        .parquet(s3_path)
    
    data = [
        Row(order_id=1, order_amount=100, year=2025, month=1, day=2),
        Row(order_id=2, order_amount=150, year=2025, month=1, day=5),
        Row(order_id=3, order_amount=200, year=2025, month=1, day=6),
        Row(order_id=5, order_amount=300, year=2025, month=2, day=23),
        Row(order_id=7, order_amount=250, year=2025, month=2, day=23),
        Row(order_id=8, order_amount=500, year=2025, month=2, day=6),
        Row(order_id=9, order_amount=250, year=2025, month=3, day=23),
    ]
    
    df_write_2 = spark.createDataFrame(data)
    display(df_write_2)
    

    New incremental DataFrame written.

    df_write_2.write.mode("overwrite") \
        .partitionBy("year", "month") \
        .option("partitionOverwriteMode", "dynamic") \
        .option("maxRecordsPerFile", 1) \
        .parquet(s3_path)
    
    df_read = spark.read.parquet(s3_path)
    display(df_read.orderBy("order_id"))
    

    DataFrame that is read after incremental dynamic overwrite performed.

    • Hive-style partition year=2024/month=12 was left untouched which is why order_id=6 persists in df_read
    • Hive-style partitions year=2025/month=1 and year=2025/month=2 were both completely overwritten (confirmed by new timestamps of ALL parquet files in these two directories and disappearance of order_id=4)
    • Hive-style partition year=2025/month=3 is newly created and order_id=9 is added to df_read. This behavior was not originally in question but I added it to the test for good measure.