Search code examples
apache-sparkpysparkdatabricksazure-databricksdatabricks-sql

Performing an upsert operation from Databricks to an Azure SQL database


I'm working on a way to upsert data into my Azure SQL database using PySpark. I want to use a Merge statement to achieve this, but I'm not sure how to make it work with the Azure sql database. I've tried using both spark.write and spark.read methods, but they didn't work as expected. Right now, I'm using JDBC to connect to the Azure SQL database.

from pyspark.sql import SparkSession
from pyspark.sql.types import *


# Connect to SQL Server using key vault credentials
scope_name = "TEST-TEST-scope"
jdbc_username = dbutils.secrets.get(scope_name, "key-username")
jdbc_password = dbutils.secrets.get(scope_name, "key-password")
jdbc_url = f"jdbc:sqlserver://server_name.database.windows.net:1433;database=test-database"
connection_properties = {
    "user": jdbc_username,
    "password": jdbc_password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

Below is my procedure definition.

def process_dataframe_upsert(url, dbtable, dataframe):

    dataframe.createOrReplaceTempView("v_new_entries")

    upsert_sql = """
        MERGE INTO {} AS target
    USING v_new_entries AS source
    ON target.Col1 = source.Col1 and target.Col2 = source.Col2 
    WHEN MATCHED THEN
        UPDATE SET target.Col3 = target.Col3 +1  
    WHEN NOT MATCHED THEN
        INSERT (Col1,Col2,Date,WorkShift,ExitStatus,Col3) VALUES (source.Col1,source.Col2,source.Col3)
    """.format(dbtable)

    #return_df = spark.read.jdbc(url=url, table="(" + upsert_sql + ") AS tmp", properties=connection_properties)
    #return return_df
    dataframe.write.jdbc(url=url, table="(" + upsert_sql + ") AS tmp", mode="append", properties=connection_properties)

Sample execution of the procedure is as below

 process_dataframe_upsert(jdbc_url, table_name, test_df)

The error that I get is below.

com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near '('.

The issue seems at the call of upsert_sql in the table argument.

table="(" + upsert_sql + ") AS tmp"

Are there any alternative methods to accomplish this task more effectively?


Solution

  • There is a limitation in Spark for using SQL queries in the Spark table parameter.

    So, what you can do is create a staging table and run a query using JayDeBeApi. Install this library using the command below.

    pip install JayDeBeApi
    

    Modify your function like this:

    import jaydebeapi
    
    def process_dataframe_upsert(url, dbtable, dataframe):
    
        dataframe.write.jdbc(url=url, table="stg", mode="overwrite")
    
        upsert_sql = """
        MERGE INTO {} AS target
        USING stg AS source
        ON target.id = source.id 
        WHEN MATCHED THEN
            UPDATE SET target.age =  source.age
        WHEN NOT MATCHED THEN
        INSERT (id, name_1, age) VALUES (source.id, source.name_1, source.age);
        """.format(dbtable)
    
        conn = jaydebeapi.connect("com.microsoft.sqlserver.jdbc.SQLServerDriver", url)
        curs = conn.cursor()
        curs.execute(upsert_sql)
        curs.close()
        conn.close()
    

    Data in the dataframe:

    id name_1 age
    3 John 25
    4 Alice 30
    5 Bob 35

    Results in the table:

    enter image description here

    Here, you can see the staging table is created, and using that table, the query is executed.

    You can also use pyodbc for this but you need to install ODBC driver.