Search code examples
scalaapache-kafkaakka-streamreactive-kafkaakka-kafka

Reactive-Kafka Stream Consumer: Dead letters occured


I am trying to consume messages from Kafka using akka's reactive kafka library. I am getting one message printed and after that I got

[INFO] [01/24/2017 10:36:52.934] [CommittableSourceConsumerMain-akka.actor.default-dispatcher-5] [akka://CommittableSourceConsumerMain/system/kafka-consumer-1] Message [akka.kafka.KafkaConsumerActor$Internal$Stop$] from Actor[akka://CommittableSourceConsumerMain/deadLetters] to Actor[akka://CommittableSourceConsumerMain/system/kafka-consumer-1#-1726905274] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

This is the code I am executing

import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import play.api.libs.json._
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

object CommittableSourceConsumerMain extends App {

  implicit val system = ActorSystem("CommittableSourceConsumerMain")
  implicit val materializer = ActorMaterializer()
  val consumerSettings =ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer).withBootstrapServers("localhost:9092").withGroupId("CommittableSourceConsumer").withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val done =
    Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
      .mapAsync(1) { msg =>
        val record=(msg.record.value())
        val data=Json.parse(record)

         val recordType=data \ "data" \"event" \"type"

        val actualData=data \ "data" \ "row"

        if(recordType.as[String]=="created"){
          "Some saving logic"
      }

      else{

        "Some logic"

      }
        msg.committableOffset.commitScaladsl()
      }
      .runWith(Sink.ignore)
}

Solution

  • I finally figured out the solution. Due to a runtime exception in the stream a Future of failure is returned which terminates the stream immediately. Akka-stream does not provide or display the runtime exception. So as to know the exception

    done.onFailure{
            case NonFatal(e)=>println(e)
          }
    

    The exception was in the if-else block. Also one can use Actor Strategy to resume stream if exception occurs.