Search code examples
amazon-s3aws-gluedelta-lake

MERGE INTO with datalake on AWS Glue inserting rows instead of updating


I am trying to set up a demo Glue job that demonstrates upserts using data lake framework.

I have example full load data I have saved as delta table in S3 bucket defined as follows:

data = {'visitor': ['foo', 'bar', 'baz'], 
         'id': [1, 2, 3],
         'B': [1, 0, 1],
         'C': [1, 0, 0]}  

and example incremental data with same setup but in different S3 prefix defined as follows:

data_updated = {'visitor': ['foo_updated'], 
        'id': [1],
        'B': [1],
        'C': [1]} 

After executing the following statements:


delta_df = DeltaTable.forPath(spark, "s3://example_bucket/full_load")
cdc_df = spark.read.format("delta").load("s3://example_bucket/incremental_load/")

final_df = delta_df.alias("prev_df").merge( \
source = cdc_df.alias("append_df"), \
#matching on primarykey
condition = expr("prev_df.id = append_df.id"))\
.whenMatchedUpdate(set= {
    "prev_df.B"           : col("append_df.B"), 
    "prev_df.C"         : col("append_df.C"),
    "prev_df.visitor"        : col("append_df.visitor")} )\
.whenNotMatchedInsert(values =
#inserting a new row to Delta table
{   "prev_df.B"             : col("append_df.B"),
    "prev_df.C"           : col("append_df.C"), 
    "prev_df.visitor"         : col("append_df.visitor"),
})\
.execute()

Tables in s3 were created as follows:

    df = pd.DataFrame(data)
    dataFrame = spark.createDataFrame(df)
    dataFrame.write \
        .format("delta") \
        .mode("overwrite") \
        .save("s3://example_bucket/full_load", overwrite=True)
    
    df = pd.DataFrame(data)
    dataFrame = spark.createDataFrame(df)
    dataFrame.write \
        .format("delta") \
        .mode("overwrite") \
        .save("s3://example_bucket/incremental_load", overwrite=True)

Incremental row with id 1 gets appended into the full load bucket instead of the original row getting updated. What am I doing wrong?


Solution

  • The overall flow should be:

    1. Enable Delta Lake for AWS Glue
    2. Create the two delta tables in Glue data catalog using dataframewriter API.
    3. Do not run any Glue crawler, or other APIs which update the Glue data catalog.
    4. Merge
    5. Query in Athena.

    Create and write a delta table:

    dataFrame.write \
        .format("delta") \
        .mode("overwrite") \
        .option("path", s3_path) \
        .saveAsTable(f"{database_name}.{table_name})
    

    Merge:

    delta_df.alias("prev_df").merge(
        cdc_df.alias("append_df"),
        # matching on primarykey
        "prev_df.id = append_df.id") \
        .whenMatchedUpdate(set= {
            "B": "append_df.B",
            "C": "append_df.C",
            "visitor": "append_df.visitor"
        })\
        .whenNotMatchedInsert(values = {
        #inserting a new row to Delta table
            "id": "append_df.id",
            "B": "append_df.B",
            "C": "append_df.C", 
            "visitor": "append_df.visitor"
        })\
        .execute()