Search code examples
amazon-s3pysparketlparquet

UPSERT in parquet Pyspark


I have parquet files in s3 with the following partitions: year / month / date / some_id Using Spark (PySpark), each day I would like to kind of UPSERT the last 14 days - I would like to replace the existing data in s3 (one parquet file for each partition), but not to delete the days that are before 14 days.. I tried two save modes: append - wasn't good because it just adds another file. overwrite - is deleting the past data and data for other partitions.

Is there any way or best practice to overcome that? should I read all the data from s3 in each run, and write it back again? maybe renaming the files so that append will replace the current file in s3?

Thanks a lot!


Solution

  • I usually do something similar. In my case I do an ETL and append one day of data to a parquet file:

    The key is to work with the data you want to write (in my case the actual date), make sure to partition by the date column and overwrite all data for the current date.

    This will preserve all old data. As an example:

    (
        sdf
        .write
        .format("parquet")
        .mode("overwrite")
        .partitionBy("date")
        .option("replaceWhere", "2020-01-27")
        .save(uri)
    )
    

    Also you could take a look at delta.io which is an extension of the parquet format that gives some interesting features like ACID transactions.