The following is a Scala code snippet of an error handler I'm making for a streaming application. It uses akka streams to consume messages ('errormsg') in a Kafka topic and write them to a table in Kudu.
val kafkaMessages: Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = Consumer.committableSource(
consumerSettings,
Subscriptions.topics(conf.getString("kafka.topics.errorRawCdr")))
val cdrs: Source[Errors, Consumer.Control] = kafkaMessages.map(msg => {
val bytes: Array[Byte] = msg.record.value()
val errormsg = (bytes.map(_.toChar)).mkString
new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg)
})
cdrs.to(new ErrorKuduSink(session, table)).run()
I'd like to re-use the variable 'errormsg' further down, as part of a few lines that email me that message.
How do I escape 'errormsg' (or alternatively incorporate the code snippet below), so that the variable scope is appropriate?
send a new Mail (
from = ("[email protected]"),
to = "[email protected]",
subject = "Encountered error",
message = errormsg
)
Here I propose an easy fix to your problem by using a MutableList:
val kafkaMessages: Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = Consumer.committableSource(
consumerSettings,
Subscriptions.topics(conf.getString("kafka.topics.errorRawCdr")))
import scala.collection.mutable._
val errorMessages: MutableList[String] = new MutableList
val cdrs: Source[Errors, Consumer.Control] = kafkaMessages.map(msg => {
val bytes: Array[Byte] = msg.record.value()
val errormsg = (bytes.map(_.toChar)).mkString
errorMessages += errormsg
new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg)
})
cdrs.to(new ErrorKuduSink(session, table)).run()