Search code examples
pythonapache-sparkpysparkapache-spark-sqlparquet

overwrite existing Parquet dataset with modified PySpark DataFrame


Use case is to append a column to a Parquet dataset and then re-write efficiently at the same location. Here is a minimal example.

Create a pandas DataFrame and write as a partitioned Parquet dataset.

import pandas as pd
df = pd.DataFrame({
        'id': ['a','a','a','b','b','b','b','c','c'],
        'value': [0,1,2,3,4,5,6,7,8]})
path = r'c:/data.parquet'
df.to_parquet(path=path, engine='pyarrow', compression='snappy', index=False, partition_cols=['id'], flavor='spark')

Then load the Parquet dataset as a pyspark view and create a modified dataset as a pyspark DataFrame.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.read.parquet(path).createTempView('data')
sf = spark.sql(f"""SELECT id, value, 0 AS segment FROM data""")

At this point sf data is same as df data but with an additional segment column of all zeros. I would like to efficiently overwrite the existing Parquet dataset at path with sf as a Parquet dataset in the same location. Below is what does not work. Also prefer not to write sf to a new location, delete old Parquet dataset, and rename as does not seem efficient.

# saves existing data and new data
sf.write.partitionBy('id').mode('append').parquet(path)
# immediately deletes existing data then crashes
sf.write.partitionBy('id').mode('overwrite').parquet(path)

Solution

  • My answer in short: you shouldn't :\

    One principle of bigdata (and spark is for bigdata), is to never override stuff. Sure, there exist the .mode('overwrite'), but this is not a correct usage.

    My guesses as to why it could (should) fail:

    • you add a column, so written dataset have a different format than the one currently stored there. This can create a schema confusion
    • you override the input data while processing. So spark read some lines, process them and override the input files. But then those files are still the inputs for other lines to process.

    What I usually do in such situation is to create another dataset, and when there is no reason to keep to old one (i.e. when the processing is completely finished), clean it. To remove files, you can check this post on how to delete hdfs files. It should work for all files accessible by spark. However it is in scala, so I'm not sure if it can be adapted to pyspark.

    Note that efficiency is not a good reason to override, it does more work that simply writing.