I have a set of SQL rules which I need to apply on a streaming dataframe inside foreachBatch()
. After applying those rules, the resultant/filtered dataframe should be written to multiple destinations like "delta" and "cosmos DB".
Below is what I have tried:
Using that static dataframe coming from forEachBatch()
method, I am trying to create a temp view as below.
df.writeStream
.format("delta")
.foreachBatch(writeToDelta _)
.outputMode("update")
.start()
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
microBatchOutputDF.createOrReplaceTempView("testTable")
}
But while running the code, its showing as table or view 'testTable' not found.
Is it possible to create a temp table/view using static dataframe in spark structured streaming ?
Or how can I write to multiple sinks?
From the comments clarifying OPs question:
"I have a set of SQL rules which I need to apply on the dataframe inside forEachBatch(). After applying the rules, the resultant/filtered dataframe will be written to multiple destinations like delta and cosmos DB."
The foreachBatch allows you to
In your case I understand you want to apply different transformations on your streaming dataframe and write it to multiple locations. You can do it like below:
df.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
// persist dataframe in case you are reusing it multiple times
batchDF.persist()
// apply SQL logic using `selectExpr` or just the DataFrame API
val deltaBatchDf = batchDF.selectExpr("")
val cosmosBatchDf = batchDF.selectExpr("")
// write to multiple sinks like you would do with batch DataFrames
// add more locations if required
deltaBatchDf.write.format("delta").options(...).save(...)
cosmosBatchDf.write.format("cosmos").options(...).save(...)
// free memory
batchDF.unpersist()
}