I'm working on a way to upsert data into my Azure SQL database using PySpark. I want to use a Merge statement to achieve this, but I'm not sure how to make it work with the Azure sql database. I've tried using both spark.write and spark.read methods, but they didn't work as expected. Right now, I'm using JDBC to connect to the Azure SQL database.
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# Connect to SQL Server using key vault credentials
scope_name = "TEST-TEST-scope"
jdbc_username = dbutils.secrets.get(scope_name, "key-username")
jdbc_password = dbutils.secrets.get(scope_name, "key-password")
jdbc_url = f"jdbc:sqlserver://server_name.database.windows.net:1433;database=test-database"
connection_properties = {
"user": jdbc_username,
"password": jdbc_password,
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
Below is my procedure definition.
def process_dataframe_upsert(url, dbtable, dataframe):
dataframe.createOrReplaceTempView("v_new_entries")
upsert_sql = """
MERGE INTO {} AS target
USING v_new_entries AS source
ON target.Col1 = source.Col1 and target.Col2 = source.Col2
WHEN MATCHED THEN
UPDATE SET target.Col3 = target.Col3 +1
WHEN NOT MATCHED THEN
INSERT (Col1,Col2,Date,WorkShift,ExitStatus,Col3) VALUES (source.Col1,source.Col2,source.Col3)
""".format(dbtable)
#return_df = spark.read.jdbc(url=url, table="(" + upsert_sql + ") AS tmp", properties=connection_properties)
#return return_df
dataframe.write.jdbc(url=url, table="(" + upsert_sql + ") AS tmp", mode="append", properties=connection_properties)
Sample execution of the procedure is as below
process_dataframe_upsert(jdbc_url, table_name, test_df)
The error that I get is below.
com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near '('.
The issue seems at the call of upsert_sql in the table argument.
table="(" + upsert_sql + ") AS tmp"
Are there any alternative methods to accomplish this task more effectively?
There is a limitation in Spark for using SQL queries in the Spark table
parameter.
So, what you can do is create a staging table and run a query using JayDeBeApi. Install this library using the command below.
pip install JayDeBeApi
Modify your function like this:
import jaydebeapi
def process_dataframe_upsert(url, dbtable, dataframe):
dataframe.write.jdbc(url=url, table="stg", mode="overwrite")
upsert_sql = """
MERGE INTO {} AS target
USING stg AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET target.age = source.age
WHEN NOT MATCHED THEN
INSERT (id, name_1, age) VALUES (source.id, source.name_1, source.age);
""".format(dbtable)
conn = jaydebeapi.connect("com.microsoft.sqlserver.jdbc.SQLServerDriver", url)
curs = conn.cursor()
curs.execute(upsert_sql)
curs.close()
conn.close()
Data in the dataframe:
id | name_1 | age |
---|---|---|
3 | John | 25 |
4 | Alice | 30 |
5 | Bob | 35 |
Results in the table:
Here, you can see the staging table is created, and using that table, the query is executed.
You can also use pyodbc
for this but you need to install ODBC driver.