Search code examples
python-3.xazure-databricksdatabricks-sqlazure-notebooks

Merging a Spark DF using INSERT INTO


I have an existing table, which I'd like to append two columns to. I create a Spark dataframe:

spark_df = spark.createDataFrame(df)

Then I'd like to use MERGE INTO as so:

spark.sql(f"""MERGE INTO x.y AS m
USING {spark_df} AS s
ON m.id = s.id
WHEN MATCHED THEN
 UPDATE SET m.id = s.id
WHEN NOT MATCHED 
 THEN INSERT (id, colnew_1) VALUES (id, spark_df[["myCol"]])""")

I retrieve a syntax error when trying to parse the spark_df. Is this functionality possible? I understand that a Delta table is to be created first, so that the MERGE operation is supported. However I'm a bit confused on the sequence of events. For example, I can create a delta table like so:

CREATE TABLE x.y_delta (id bigint, colnew_1 bigint) USING delta

However this table is empty. I suppose an intermediate step is to completely copy the original table, to this new delta table. Then use this delta table accordingly. Though I'm not convinced that this is also right.


Solution

  • As suggested by @blackbishop, Create temp view for the data frame.

    df12.createOrReplaceTempView("temp_table1")
    

    I followed the same suggestion, its working fine .Follow below steps:

    Code:

    Sample data frame df12:

    from pyspark.sql import types as f
    df12 = spark.createDataFrame(
        [
            (1,"vam",400),  
            (2,"gov",456)
        ],
        f.StructType(  
            [
                f.StructField("id", f.IntegerType(), True),
                f.StructField("col1", f.StringType(), True),
                f.StructField("myCol", f.IntegerType(), True)
            ]
        ),
    )
    

    Create Delta table :

    spark.sql("CREATE TABLE x.y_delta2 (id int, col1 string, myCol int) USING delta")
    spark.sql("insert into x.y_delta2 values (1,'govind',123),(3,'deep',456)")
    

    enter image description here

    Create Temp View

    df12.createOrReplaceTempView("temp_table1")
    

    Merge operation:

    spark.sql(f"""MERGE INTO x.y_delta2 AS m
    USING temp_table1 AS s
    ON m.id = s.id
    WHEN MATCHED THEN
     UPDATE SET m.id = s.id
    WHEN NOT MATCHED 
     THEN INSERT (m.id,m.col1,m.myCol) VALUES (s.id,s.col1,s.myCol)""")
    

    enter image description here