Search code examples
scalastreamingdatabricks

How to run an if else statement in Scala in Databricks streaming


I am new both to Scala and to Databricks streaming. I am reading streamed events into a dataframe and I want to use an if-else statement to trigger a different notebook based on whether the dataframe is empty or not. The simple code below (and variations of it)

if(finalDF.isEmpty){ 
  print("0")
}
else{
  print("1")
}

persistently results in the following error

AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
eventhubs

How can I incorporate writeStream.start() into the above code? Or, how can I evaluate the dataframe content and based on that, take one or another action, given that the dataframe is populated by streaming events into it?


Solution

  • I tested this code and it works as a way to introduce an if-else and decide actions based on event contents.

    df.writeStream.foreachBatch((df: org.apache.spark.sql.DataFrame, batchID: Long) => myfunc(df)).start()
    
    def myfunc(df: org.apache.spark.sql.DataFrame){
        val test1= df.filter($"col" === "test1")
        val test2= df.filter($"col" === "test2")
        if(test1.count()>0){ 
            dbutils.notebook.run("some_notebook", 60)
        }
        if(test2.count()>0){
            dbutils.notebook.run("another_notebook", 60)
        }
    }