Search code examples
pysparkdatabricksparquet

Pypark append with partitionBy overwrites unpartitioned parquet


In Azure Databricks, when I have a parquet file that is not partitioned by some column. And subsequently append a new dataframe with partitionBy("some_column"), the data of my original "unpartitioned" dataframe is overwritten. Why does this happen? Shouldn't the append repartition my initial data? Or at least give a warning the data is overwritten?

## Example 1, only data from df2 is in file_path
# Init dataframe
df1.write.mode("overwrite").parquet(file_path)
# Append with partitionBy 
df2.write.mode("append").partitionBy("column1").parquet(file_path)

## Example 2, correctly appended data frome both frames is in file_path
# Init dataframe
df1.write.mode("overwrite").partitionBy("column1").parquet(file_path)
# Append with partitionBy 
df2.write.mode("append").partitionBy("column1").parquet(file_path)

Solution

  • Problem

    The behavior you observed in Example 1 occurs because appending a dataframe with a different partitioning to an existing Parquet file does not trigger a repartitioning of the original data. Instead, new directories are created under the original one to store partitions. When Spark reads Parquet after writing the second dataframe, your original metadata gets lost.

    Solution

    Some other implementations, such as Delta tables raise an error when you try to write with different partitions.

    This example shows your use case with some mock data:

    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    
    # Define schema
    schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True)
    ])
    
    # Create data
    data1 = [(1, "Alice"),
            (2, "Bob"),
            (3, "Charlie")]
    
    data2 = [(1, "Alice"),
            (2, "Bob"),
            (3, "Charlie"),
            (4, "Joe")]
    
    # Create DataFrame
    df1 = spark.createDataFrame(data1, schema)
    df2 = spark.createDataFrame(data2, schema)
    
    df1.write.mode("overwrite").format('delta').save('dbfs:/FileStore/testdelta')
    # Append with partitionBy 
    df2.write.mode("append").partitionBy("name").format('delta').save('dbfs:/FileStore/testdelta')
    
    display(spark.read.format('delta').load('dbfs:/FileStore/testdelta'))
    

    The code should raise an exception as follows:

    AnalysisException: Partition columns do not match the partition columns of the table.
    Given: [`name`]
    Table: []
    

    Preventing your data get lost.

    Extra Ball

    This article dives into the internal Parquet files structure and the Spark reader: