my Scenario
issue:
inData = spark.readstream().format("eventhub")
udfdata = indata.select(from_json(myudf("column"), schema)).as("result").select(result.*)
filter1 = udfdata.filter("column =='filter1'")
filter 2 = udfdata.filter("column =='filter2'")
# write filter1 to two differnt sinks
filter1.writestream().format(delta).start(table1)
filter1.writestream().format(eventhub).start()
# write filter2 to two differnt sinks
filter2.writestream().format(delta).start(table2)
filter2.writestream().format(eventhub).start()
Each time you call .writestream()....start()
you are creating a new independent streaming query.
This means that for each output sink you define Spark will read again from the input source and process the dataframe.
If you want to read and process only one time and then output to multiple sink you can use foreachBatch sink as a workaround:
inData = spark.readstream().format("eventhub")
udfdata = indata.select(from_json(myudf("column"), schema)).as("result").select(result.*)
udfdata.writeStream().foreachBatch(filter_and_output).start()
def filter_and_output(udfdata, batchId):
# At this point udfdata is a batch dataframe, no more a streaming dataframe
udfdata.cache()
filter1 = udfdata.filter("column =='filter1'")
filter2 = udfdata.filter("column =='filter2'")
# write filter1
filter1.write().format(delta).save(table1)
filter1.write().format(eventhub).save()
# write filter2
filter2.write().format(delta).save(table2)
filter2.write().format(eventhub).save()
udfdata.unpersist()
You can learn more about foreachBatch in the Spark Structured Streaming documentation.
To answer your questions