This is a real mystery for me: when I try to write a pyspark df into Azure dataframe using jdbc, I run into a strange situation. While running the 'write' function my table is changed somehow without any reason, and sends to Azure wrong data. Afterwards it saves my pyspark df with the same wrong data. Here is a part of code I have written:
print(sparkDF_cleaned.show())
sparkDF_cleaned.write \
.format("jdbc") \
.mode("overwrite")
.option("url", jdbcUrl) \
.option("dbtable", "dbo.upsert_test") \
.option("user", jdbcUsername) \
.option("password", jdbcPassword) \
.save()
print(f"data loaded to table {db_table_name}")
print(sparkDF_cleaned.show())
Output is next:
sparkDF_cleaned :
+------------+---+-----+----------+----------------------+
| id_date| id|value| _date|datetime_of_extraction|
+------------+---+-----+----------+----------------------+
|1 2022-05-01| 1| 17|2022-05-01| 2022-06-01|
|1 2022-05-06| 1| 6|2022-05-06| 2022-06-13|
|2 2022-05-02| 2| 10|2022-05-02| 2022-06-01|
|3 2022-05-03| 3| 15|2022-05-03| 2022-06-01|
+------------+---+-----+----------+----------------------+
data loaded to table upsert_test
sparkDF_cleaned :
+------------+---+-----+----------+----------------------+
| id_date| id|value| _date|datetime_of_extraction|
+------------+---+-----+----------+----------------------+
|1 2022-05-06| 1| 6|2022-05-06| 2022-06-13|
|2 2022-05-02| 2| 5|2022-05-02| 2022-06-13|
+------------+---+-----+----------+----------------------+
Azure table recieves the data as it is in the second table. Dear colleges, why does it happen? Thanks for your time in advance.
I can't still avoid the problem when using pyspark jdbc overwrite with pyspark transformed table.
Hovewer, I have decided to write pyspark table to local file, read saved data from the file and than post the data using the pyspark jdbc overwrite function.
It does not seem to be the best solution, hovewer, it works. If there will be better solutions, will be happy to read.
path = "*DataBricksWorkspace*/temp_table.json"
dbutils.fs.rm(path, True)
sparkDF.write.json(path)
sparkDF_copy = spark.read.json(file)
sparkDF_copy.write.jdbc(url=jdbcUrl,
table=db_table_name,
mode="overwrite",
properties=connectionProperties)