Search code examples
scalaapache-kafkaakka-stream

Changing scala local variables scope


The following is a Scala code snippet of an error handler I'm making for a streaming application. It uses akka streams to consume messages ('errormsg') in a Kafka topic and write them to a table in Kudu.

val kafkaMessages: Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = Consumer.committableSource(
    consumerSettings,
    Subscriptions.topics(conf.getString("kafka.topics.errorRawCdr")))

  val cdrs: Source[Errors, Consumer.Control] = kafkaMessages.map(msg => {
    val bytes: Array[Byte] = msg.record.value()
    val errormsg = (bytes.map(_.toChar)).mkString
    new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg)
  })

  cdrs.to(new ErrorKuduSink(session, table)).run()

I'd like to re-use the variable 'errormsg' further down, as part of a few lines that email me that message.

How do I escape 'errormsg' (or alternatively incorporate the code snippet below), so that the variable scope is appropriate?

  send a new Mail (
    from = ("[email protected]"),
    to = "[email protected]",
    subject = "Encountered error",
    message = errormsg
  )

Solution

  • Here I propose an easy fix to your problem by using a MutableList:

    val kafkaMessages: Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = Consumer.committableSource(
        consumerSettings,
        Subscriptions.topics(conf.getString("kafka.topics.errorRawCdr")))
    
        import scala.collection.mutable._
        val errorMessages: MutableList[String] = new MutableList
    
      val cdrs: Source[Errors, Consumer.Control] = kafkaMessages.map(msg => {
        val bytes: Array[Byte] = msg.record.value()
        val errormsg = (bytes.map(_.toChar)).mkString
        errorMessages += errormsg
        new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg)
      })
    
      cdrs.to(new ErrorKuduSink(session, table)).run()