I am new both to Scala and to Databricks streaming. I am reading streamed events into a dataframe and I want to use an if-else statement to trigger a different notebook based on whether the dataframe is empty or not. The simple code below (and variations of it)
if(finalDF.isEmpty){
print("0")
}
else{
print("1")
}
persistently results in the following error
AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
eventhubs
How can I incorporate writeStream.start() into the above code? Or, how can I evaluate the dataframe content and based on that, take one or another action, given that the dataframe is populated by streaming events into it?
I tested this code and it works as a way to introduce an if-else and decide actions based on event contents.
df.writeStream.foreachBatch((df: org.apache.spark.sql.DataFrame, batchID: Long) => myfunc(df)).start()
def myfunc(df: org.apache.spark.sql.DataFrame){
val test1= df.filter($"col" === "test1")
val test2= df.filter($"col" === "test2")
if(test1.count()>0){
dbutils.notebook.run("some_notebook", 60)
}
if(test2.count()>0){
dbutils.notebook.run("another_notebook", 60)
}
}