Search code examples
apache-sparkspark-structured-streamingazure-databricks

structured streaming writing to multiple streams


my Scenario

  1. Gets data from a stream and call a UDF which return a json string. one of the attribute in JSON string is UniqueId, which UDF is generating as guid.newGuid() (C#).
    1. DataFrame output of UDF is written to multiple streams/sinks based on some fiter.

issue:

  1. each sink is getting a new value for the UniqueId which was generated by UDF. How can i maintain the same UniqueId for all sinks.
  2. If each sink is getting different values for UniqueId, does that mean my UDF is getting called multiple times for each sink?
  3. If UDF is getting invoked twice, what is the option to get it called once and then just write same data to different sinks
inData = spark.readstream().format("eventhub")

udfdata = indata.select(from_json(myudf("column"), schema)).as("result").select(result.*)

filter1 =  udfdata.filter("column =='filter1'")
filter 2 = udfdata.filter("column =='filter2'") 

# write filter1 to two differnt sinks
filter1.writestream().format(delta).start(table1)
filter1.writestream().format(eventhub).start()

# write filter2 to two differnt sinks
filter2.writestream().format(delta).start(table2)
filter2.writestream().format(eventhub).start()

Solution

  • Each time you call .writestream()....start() you are creating a new independent streaming query.

    This means that for each output sink you define Spark will read again from the input source and process the dataframe.

    If you want to read and process only one time and then output to multiple sink you can use foreachBatch sink as a workaround:

    inData = spark.readstream().format("eventhub")
    udfdata = indata.select(from_json(myudf("column"), schema)).as("result").select(result.*)
    
    udfdata.writeStream().foreachBatch(filter_and_output).start()
    
    def filter_and_output(udfdata, batchId):
        # At this point udfdata is a batch dataframe, no more a streaming dataframe
        udfdata.cache()
        filter1 = udfdata.filter("column =='filter1'")
        filter2 = udfdata.filter("column =='filter2'") 
    
        # write filter1
        filter1.write().format(delta).save(table1)
        filter1.write().format(eventhub).save()
    
        # write filter2
        filter2.write().format(delta).save(table2)
        filter2.write().format(eventhub).save()
    
        udfdata.unpersist()
    

    You can learn more about foreachBatch in the Spark Structured Streaming documentation.

    To answer your questions

    1. If you use foreachBatch your data will be processed only once and you will have the same UniqueId for all sinks
    2. Yes
    3. Using foreachBatch will solve the issue