Search code examples
spark-structured-streaming

Spark Structured Streaming exception handling


I reading data from a MQTT streaming source with Spark Structured Streaming API.

val lines:= spark.readStream
  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  .option("topic", "Employee")
  .option("username", "username")
  .option("password", "passwork")
  .option("clientId", "employee11")
  .load("tcp://localhost:8000").as[(String, Timestamp)]

I convert the streaming data to case class Employee

case class Employee(Name: String, Department: String)    
val ds = lines.map {
        row =>
          implicit val format = DefaultFormats
          parse(row._1).extract[Employee]
      }
  ....some transformations
 df.writeStream
        .outputMode("append")
        .format("es")
        .option("es.resource", "spark/employee")
        .option("es.nodes", "localhost")
        .option("es.port", 9200)
        .start()
        .awaitTermination()

Now there were some messages in the queue which had different structure than Employee case class. Lets say some required columns were missing. My streaming job failed with field not found exception.

Now I will like to handle such exception and also will like to send an alert notification for the same. I tried putting a try/catch block.

case class ErrorMessage(row: String)        
catch {

      case e: Exception =>
        val ds = lines.map {
          row =>
            implicit val format = DefaultFormats
            parse(row._1).extract[ErrorMessage]
        }
        val error = lines.foreach(row => {
          sendErrorMail(row._1)
        })
    }
  }

Got the exception that Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; mqtt Any help on this will be appreciated.


Solution

  • I think you should rather use the return object of the start() method as described in Spark streaming doc. Something like:

    val query = df.writeStream. ... .start()
    try {
       //If the query has terminated with an exception, then the exception will be thrown.
       query.awaitTermination()
    catch {
       case ex: Exception => /*code to send mail*/
    }
    

    Implementing your own foreach sink can cause overhead with frequent opening and closing connections.