Search code examples
apache-sparkspark-structured-streaming

How to write streaming DataFrame into multiple sinks in Spark Structured Streaming


I have a set of SQL rules which I need to apply on a streaming dataframe inside foreachBatch(). After applying those rules, the resultant/filtered dataframe should be written to multiple destinations like "delta" and "cosmos DB".

Below is what I have tried: Using that static dataframe coming from forEachBatch() method, I am trying to create a temp view as below.

df.writeStream
  .format("delta")
  .foreachBatch(writeToDelta _)
  .outputMode("update")
  .start()

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
    microBatchOutputDF.createOrReplaceTempView("testTable")
}

But while running the code, its showing as table or view 'testTable' not found.

Is it possible to create a temp table/view using static dataframe in spark structured streaming ?

Or how can I write to multiple sinks?


Solution

  • From the comments clarifying OPs question:

    "I have a set of SQL rules which I need to apply on the dataframe inside forEachBatch(). After applying the rules, the resultant/filtered dataframe will be written to multiple destinations like delta and cosmos DB."

    The foreachBatch allows you to

    • Reuse existing batch data sources
    • Write to multiple locations

    In your case I understand you want to apply different transformations on your streaming dataframe and write it to multiple locations. You can do it like below:

    df.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    
      // persist dataframe in case you are reusing it multiple times
      batchDF.persist()
    
      // apply SQL logic using `selectExpr` or just the DataFrame API
      val deltaBatchDf = batchDF.selectExpr("") 
      val cosmosBatchDf = batchDF.selectExpr("") 
    
      // write to multiple sinks like you would do with batch DataFrames
      // add more locations if required
      deltaBatchDf.write.format("delta").options(...).save(...)
      cosmosBatchDf.write.format("cosmos").options(...).save(...)
    
      // free memory
      batchDF.unpersist()
    }