I reading data from a MQTT streaming source with Spark Structured Streaming API.
val lines:= spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", "Employee")
.option("username", "username")
.option("password", "passwork")
.option("clientId", "employee11")
.load("tcp://localhost:8000").as[(String, Timestamp)]
I convert the streaming data to case class Employee
case class Employee(Name: String, Department: String)
val ds = lines.map {
row =>
implicit val format = DefaultFormats
parse(row._1).extract[Employee]
}
....some transformations
df.writeStream
.outputMode("append")
.format("es")
.option("es.resource", "spark/employee")
.option("es.nodes", "localhost")
.option("es.port", 9200)
.start()
.awaitTermination()
Now there were some messages in the queue which had different structure than Employee
case class. Lets say some required columns were missing. My streaming job failed with field not found exception.
Now I will like to handle such exception and also will like to send an alert notification for the same. I tried putting a try/catch block.
case class ErrorMessage(row: String)
catch {
case e: Exception =>
val ds = lines.map {
row =>
implicit val format = DefaultFormats
parse(row._1).extract[ErrorMessage]
}
val error = lines.foreach(row => {
sendErrorMail(row._1)
})
}
}
Got the exception that Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
mqtt
Any help on this will be appreciated.
I think you should rather use the return object of the start()
method as described in Spark streaming doc. Something like:
val query = df.writeStream. ... .start()
try {
//If the query has terminated with an exception, then the exception will be thrown.
query.awaitTermination()
catch {
case ex: Exception => /*code to send mail*/
}
Implementing your own foreach sink can cause overhead with frequent opening and closing connections.