Search code examples
pythonapache-sparkpyspark

how to create parquet partitions with Spark 3.3 and update parquet files every day with new information


I'm new for Spark overall and for parquet files as well. Please be patient and if you know answer please be specific. I have s3 bucket which is receiving new csv file every with those columns:

["profile", "date", "local_timezone", "utility", "iso", "unit", *[f"hour_{i}" for i in range(24)]

My task is to work with ETL jobs and make partitions based on profile. So I did create ETL job, insert that file, and trying to run this:

output_df.select("*").write.partitionBy("profile").mode("append")).parquet(s3_output_path).

with append mode its creating partitions based on profiles:

profile=category_one/   
profile=category_two/   

and adding new parquet file in each of this partitions:

part-00000-63df9987-67e4-46bb-b1ef-2fefc1614736.c000.snappy.parquet

The problem is that If I process next day csv file, it will create new parquet file and so on. Even if I will run same date twice, it will create 2 separate parquet files.

I need SAME parquet file to be updated with new data every day, and if there is profile and date already then whole row should be replaced.

I did try many different options with overwrite, save. Nothing works. With overwrite, its creating new file, but if i'm tryint to update new rows in data frame its showing me error "File is not existing"

I did also try to modify data frame output_df before partitioning and its still not working. Giving all sorts of errors.

So, I just need to run Partitioning every day and update or append data for same partitions in with one parquet file in each partition.

Let me know if you have any questions.

Thank you for your time


Solution

  • What you need is something that deals well with what is known as UPSERTs (UPDATE/INSERT). In vanilla Spark, handling UPSERTs can become increasingly complex and inefficient as your dataset scales. The reason is that Spark's native capabilities don't directly support transactional updates or inserts out-of-the-box.

    Thankfully there have been some solutions to this particular problem. Three solutions that right now are very successful are Apache Iceberg , Apache Hudi and DataBricks DeltaLake. Out of the 3, I find Iceberg the one that is more useful because of its better AWS integration (but this is more of an opinion so YMMV so do try the other two as you may like them more).

    Apache Iceberg excels in this kind of problem that you have by providing robust ACID transaction support, including straightforward MERGE INTO operations for UPSERTs.

    Anyway this is how you would approach this with Iceberg:

    You would need to either be able to use spark.jars.packages so that the jar that includes the Iceberg functionality is made available OR to provide the needed jar yourself by downloading the right version from here. In general in order to have Iceberg work on your system consult this getting-started guide

    Now assuming that you are able to use spark.jars.packages (which means you are not working on an air-gapped system) this is how you could approach your problem.

    from pyspark.sql import SparkSession
    
    # Initialize Spark
    spark = SparkSession.builder \
        .appName("IcebergMergeExample") \
        .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark3-runtime:0.12.0,org.apache.hadoop:hadoop-aws:3.2.0") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.local.type", "hadoop") \
        .config("spark.sql.catalog.local.warehouse", "s3a://your-bucket/your-iceberg-warehouse/") \
        .getOrCreate()
    
    # Create an Iceberg table, if not already existing
    spark.sql("""
        CREATE TABLE IF NOT EXISTS local.db.table_name (
            profile STRING,
            date STRING,
            local_timezone STRING,
            utility STRING,
            iso STRING,
            unit STRING,
            hour_0 DOUBLE,
            hour_1 DOUBLE,
            ...
            hour_23 DOUBLE
        )
        USING iceberg
        PARTITIONED BY (profile)
    """)
    
    # Load new data into a temporary table
    new_data_df.createOrReplaceTempView("temp_new_data")
    
    # Use the MERGE INTO statement that is provided by Iceberg to upsert data
    spark.sql("""
        MERGE INTO local.db.table_name AS target
        USING temp_new_data AS source
        ON target.profile = source.profile AND target.date = source.date
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)