Search code examples
amazon-web-servicespysparkaws-glue

AWS Glue process records row wise insert update delete


I have a requirement to do the incremental loading to a table base on some logic

  • If record_id exist on the original table, then update the whole row with the new record
  • If record_id doesn't exist, then insert a new row with a new custom_id as the primary key
  • For some logic need to delete the whole record

Is it possible to do these kinds of row-wise operation in Glue? Thanks!


Solution

  • Thanks to Apache Iceberg, it is now possible to achieve upsert into data lake tables, after a long wait of 4 years. Currently, there are two ways to do this.

    Using Athena

    The first method involves using Athena, where you can create an Iceberg table directly from Athena and specify it as an ICEBERG table. Once the table is created, you can run the merge into command to update or insert data as required.

    1. Directly create a Iceberg table from Athena, and specify it as ICEBERG table

    Amazon Athena - Creating Iceberg tables

    CREATE TABLE iceberg_table (id bigint, data string, category string)
    PARTITIONED BY (category, bucket(16, id))
    LOCATION 's3://YOUR-BUCKET/your-folder/'
    TBLPROPERTIES ( 'table_type' = 'ICEBERG' )
    
    1. After that, you can run merge into command

    Amazon Athena - MERGE INTO

    MERGE INTO accounts t USING monthly_accounts_update s
        ON (t.customer = s.customer)
        WHEN MATCHED
            THEN UPDATE SET purchases = s.purchases + t.purchases
        WHEN NOT MATCHED
            THEN INSERT (customer, purchases, address)
                  VALUES(s.customer, s.purchases, s.address)
    
    

    Using Glue

    The second method is using Glue, where you can merge data into an Iceberg table using a Spark SQL query. This approach involves implementing a CDC-based UPSERT in a data lake using Apache Iceberg and AWS Glue. You can refer to the detailed reference link to learn more about this method.

    IcebergMergeOutputDF = spark.sql("""
        MERGE INTO job_catalog.iceberg_demo.iceberg_output t
        USING (SELECT op, product_id, category, product_name, quantity_available, to_timestamp(last_update_time) as last_update_time FROM incremental_input_data) s
        ON t.product_id = s.product_id
        WHEN MATCHED AND s.op = 'D' THEN DELETE
        WHEN MATCHED THEN UPDATE SET t.quantity_available = s.quantity_available, t.last_update_time = s.last_update_time 
        WHEN NOT MATCHED THEN INSERT (product_id, category, product_name, quantity_available, last_update_time) VALUES (s.product_id, s.category, s.product_name, s.quantity_available, s.last_update_time)
    """)
    

    Here's the detailed reference: Implement a CDC-based UPSERT in a data lake using Apache Iceberg and AWS Glue