Search code examples
apache-sparkapache-spark-sqlspark-structured-streaming

Insert/Upsert/Delete (CDC) PySpark Structured Streaming


  1. Lets supose that we have a initial file like this:
Id Number ChangeMode
1 10 insert
2 20 insert
3 30 insert
4 40 insert
5 50 insert
  1. My table in mariaDB should be something like this:
Id Number
1 10
2 20
3 30
4 40
5 50
  1. Then other file like this come to folder:
Id Number ChangeMode
1 123 upsert
2 456 upsert
3 30 remove
  1. And the table should be like this :
Id Number
1 123
2 456
4 40
5 50

How can i use the column "ChangeMode" as a reference to say to spark when it will insert/update/delete?

I already wrote this part of code, but i dont know how to proceed from here, and also dont know how to implement delete.

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = (SparkSession
    .builder
    .appName("Spark Structured Streaming CDC")
    .config("spark.driver.extraClassPath", "E:\\pyspark_projects\\mariadb-java-client-2.7.1.jar")
    .getOrCreate())

streamingSchema = StructType([
    StructField("Id", IntegerType(),True),
    StructField("Number", IntegerType(),True),
    StructField("ChangeMode", StringType(),True),
])

streamingDF = (spark.readStream
        .format("csv")
        .option("sep", "|")
        .schema(streamingSchema)
        .csv("E:\\pyspark_projects\\stream_cdc\\files\\input\\"))

db_target_properties = {"user":"root", "password":"root", "driver":"org.mariadb.jdbc.Driver"}
db_target_url = "jdbc:mariadb://127.0.0.1:3306/projects"

streamingInsert = streamingDF.where("ChangeMode == 'insert'")
streamingUpsert = streamingDF.where("ChangeMode == 'upsert'")

def insert(df, epoch_id):
    streamingInsert.write.jdbc(url=db_target_url, table="cdc", mode="append", properties=db_target_properties)
    pass

def upsert(df, epoch_id):
    streamingUpsert.write.jdbc(url=db_target_url, table="cdc", mode="update", properties=db_target_properties)
    pass

queryInsert = streamingInsert.writeStream.foreachBatch(insert).start()
queryUpdate = streamingUpsert.writeStream.foreachBatch(upsert).start()

spark.streams.awaitAnyTermination()

I'm having the following error:

py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "C:\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 2442, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "C:\Spark\python\pyspark\sql\utils.py", line 207, in call
raise e
File "C:\Spark\python\pyspark\sql\utils.py", line 204, in call
self.func(DataFrame(jdf, self.sql_ctx), batch_id)
File "main.py", line 32, in insert
streamingInsert.write.jdbc(url=db_target_url, table="cdc", mode="append", properties=db_target_properties)
File "C:\Spark\python\pyspark\sql\dataframe.py", line 231, in write
return DataFrameWriter(self)
File "C:\Spark\python\pyspark\sql\readwriter.py", line 645, in __init__
self._jwrite = df._jdf.write()
File "C:\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\Spark\python\pyspark\sql\utils.py", line 134, in deco
raise_from(converted)
File "<string>", line 3, in raise_from
pyspark.sql.utils.AnalysisException: 'write' can not be called on streaming Dataset/DataFrame;

at py4j.Protocol.getReturnValue(Protocol.java:476)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
at com.sun.proxy.$Proxy17.call(Unknown Source)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:56)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:56)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:36)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:572)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:570)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:570)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)

If anyone knows another method of doing the same, please let me know.


Solution

  • I found a way to do that, usign another module to write in mariaDB, to insert/update i only use one command, and to delete i use a separate command:

    Hope it helps someone in future!

    import findspark
    findspark.init()
    
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    import mariadb
    
    spark = (SparkSession
        .builder
        .appName("Spark Structured Streaming CDC")
        .getOrCreate())
    
    streamingSchema = StructType([
        StructField("Id", IntegerType(),True),
        StructField("Number", IntegerType(),True),
        StructField("ChangeMode", StringType(),True)
    ])
    
    streamingDF = (spark.readStream
        .format("csv")
        .option("sep", "|")
        .schema(streamingSchema)
        .csv("E:\\pyspark_projects\\stream_cdc\\files\\input\\"))
    
    class RowWriter:
        def open(self, partition_id, epoch_id):
            print("Opened %d, %d" % (partition_id, epoch_id))
            return True
        def process(self, row):
            conn = mariadb.connect(
                user="root",
                password="root",
                host="127.0.0.1",
                port=3306,
                database="projects"
            )
            cur = conn.cursor()
            if(row[2] == 'insert' or 'update'):
                cur.execute("INSERT INTO cdc (Id,Number) VALUES ("+str(row[0])+", "+str(row[1])+") ON DUPLICATE KEY UPDATE Number = "+str(row[1])+"")
            if(row[2] == 'delete'):
                cur.execute("DELETE FROM cdc WHERE Id = "+str(row[0])+"")
            conn.commit()
            conn.close()
            
        def close(self, error):
            print("Closed with error: %s" % str(error))
    
    query = (streamingDF.writeStream
        .foreach(RowWriter())
        .option("checkpointLocation", "E:\\pyspark_projects\\stream_cdc\\files\\checkpoint")
        .start())
    
    query.awaitTermination()