I have a requirement to do the incremental loading to a table base on some logic
Is it possible to do these kinds of row-wise operation in Glue? Thanks!
Thanks to Apache Iceberg, it is now possible to achieve upsert into data lake tables, after a long wait of 4 years. Currently, there are two ways to do this.
The first method involves using Athena, where you can create an Iceberg table directly from Athena and specify it as an ICEBERG table. Once the table is created, you can run the merge into command to update or insert data as required.
Amazon Athena - Creating Iceberg tables
CREATE TABLE iceberg_table (id bigint, data string, category string)
PARTITIONED BY (category, bucket(16, id))
LOCATION 's3://YOUR-BUCKET/your-folder/'
TBLPROPERTIES ( 'table_type' = 'ICEBERG' )
MERGE INTO accounts t USING monthly_accounts_update s
ON (t.customer = s.customer)
THEN UPDATE SET purchases = s.purchases + t.purchases
THEN INSERT (customer, purchases, address)
VALUES(s.customer, s.purchases, s.address)
The second method is using Glue, where you can merge data into an Iceberg table using a Spark SQL query. This approach involves implementing a CDC-based UPSERT in a data lake using Apache Iceberg and AWS Glue. You can refer to the detailed reference link to learn more about this method.
IcebergMergeOutputDF = spark.sql("""
MERGE INTO job_catalog.iceberg_demo.iceberg_output t
USING (SELECT op, product_id, category, product_name, quantity_available, to_timestamp(last_update_time) as last_update_time FROM incremental_input_data) s
ON t.product_id = s.product_id
WHEN MATCHED THEN UPDATE SET t.quantity_available = s.quantity_available, t.last_update_time = s.last_update_time
WHEN NOT MATCHED THEN INSERT (product_id, category, product_name, quantity_available, last_update_time) VALUES (s.product_id, s.category, s.product_name, s.quantity_available, s.last_update_time)
Here's the detailed reference: Implement a CDC-based UPSERT in a data lake using Apache Iceberg and AWS Glue