Id | Number | ChangeMode |
---|---|---|
1 | 10 | insert |
2 | 20 | insert |
3 | 30 | insert |
4 | 40 | insert |
5 | 50 | insert |
Id | Number |
---|---|
1 | 10 |
2 | 20 |
3 | 30 |
4 | 40 |
5 | 50 |
Id | Number | ChangeMode |
---|---|---|
1 | 123 | upsert |
2 | 456 | upsert |
3 | 30 | remove |
Id | Number |
---|---|
1 | 123 |
2 | 456 |
4 | 40 |
5 | 50 |
How can i use the column "ChangeMode" as a reference to say to spark when it will insert/update/delete?
I already wrote this part of code, but i dont know how to proceed from here, and also dont know how to implement delete.
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
spark = (SparkSession
.builder
.appName("Spark Structured Streaming CDC")
.config("spark.driver.extraClassPath", "E:\\pyspark_projects\\mariadb-java-client-2.7.1.jar")
.getOrCreate())
streamingSchema = StructType([
StructField("Id", IntegerType(),True),
StructField("Number", IntegerType(),True),
StructField("ChangeMode", StringType(),True),
])
streamingDF = (spark.readStream
.format("csv")
.option("sep", "|")
.schema(streamingSchema)
.csv("E:\\pyspark_projects\\stream_cdc\\files\\input\\"))
db_target_properties = {"user":"root", "password":"root", "driver":"org.mariadb.jdbc.Driver"}
db_target_url = "jdbc:mariadb://127.0.0.1:3306/projects"
streamingInsert = streamingDF.where("ChangeMode == 'insert'")
streamingUpsert = streamingDF.where("ChangeMode == 'upsert'")
def insert(df, epoch_id):
streamingInsert.write.jdbc(url=db_target_url, table="cdc", mode="append", properties=db_target_properties)
pass
def upsert(df, epoch_id):
streamingUpsert.write.jdbc(url=db_target_url, table="cdc", mode="update", properties=db_target_properties)
pass
queryInsert = streamingInsert.writeStream.foreachBatch(insert).start()
queryUpdate = streamingUpsert.writeStream.foreachBatch(upsert).start()
spark.streams.awaitAnyTermination()
I'm having the following error:
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "C:\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 2442, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "C:\Spark\python\pyspark\sql\utils.py", line 207, in call
raise e
File "C:\Spark\python\pyspark\sql\utils.py", line 204, in call
self.func(DataFrame(jdf, self.sql_ctx), batch_id)
File "main.py", line 32, in insert
streamingInsert.write.jdbc(url=db_target_url, table="cdc", mode="append", properties=db_target_properties)
File "C:\Spark\python\pyspark\sql\dataframe.py", line 231, in write
return DataFrameWriter(self)
File "C:\Spark\python\pyspark\sql\readwriter.py", line 645, in __init__
self._jwrite = df._jdf.write()
File "C:\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\Spark\python\pyspark\sql\utils.py", line 134, in deco
raise_from(converted)
File "<string>", line 3, in raise_from
pyspark.sql.utils.AnalysisException: 'write' can not be called on streaming Dataset/DataFrame;
at py4j.Protocol.getReturnValue(Protocol.java:476)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
at com.sun.proxy.$Proxy17.call(Unknown Source)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:56)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:56)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:36)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:572)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:570)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:570)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
If anyone knows another method of doing the same, please let me know.
I found a way to do that, usign another module to write in mariaDB, to insert/update i only use one command, and to delete i use a separate command:
Hope it helps someone in future!
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import mariadb
spark = (SparkSession
.builder
.appName("Spark Structured Streaming CDC")
.getOrCreate())
streamingSchema = StructType([
StructField("Id", IntegerType(),True),
StructField("Number", IntegerType(),True),
StructField("ChangeMode", StringType(),True)
])
streamingDF = (spark.readStream
.format("csv")
.option("sep", "|")
.schema(streamingSchema)
.csv("E:\\pyspark_projects\\stream_cdc\\files\\input\\"))
class RowWriter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True
def process(self, row):
conn = mariadb.connect(
user="root",
password="root",
host="127.0.0.1",
port=3306,
database="projects"
)
cur = conn.cursor()
if(row[2] == 'insert' or 'update'):
cur.execute("INSERT INTO cdc (Id,Number) VALUES ("+str(row[0])+", "+str(row[1])+") ON DUPLICATE KEY UPDATE Number = "+str(row[1])+"")
if(row[2] == 'delete'):
cur.execute("DELETE FROM cdc WHERE Id = "+str(row[0])+"")
conn.commit()
conn.close()
def close(self, error):
print("Closed with error: %s" % str(error))
query = (streamingDF.writeStream
.foreach(RowWriter())
.option("checkpointLocation", "E:\\pyspark_projects\\stream_cdc\\files\\checkpoint")
.start())
query.awaitTermination()