Search code examples
scalaapache-kafkastreamingkafka-consumer-api

How to get the latest value from a kafka Stream


I am fairly new to Kafka and streaming.I have a requirement like every time I run the kafka producer and consumer I should get the only message produced by producer.

Below is the basic code for Producer and consumer

Producer

 val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
    val record = new ProducerRecord[String, String]("test", "key", jsonstring)
    producer.send(record)
    producer.close()

Consumer

val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "earliest")
    props.put("group.id", "13")
    val consumer: KafkaConsumer[String, Map[String,Any]] = new KafkaConsumer[String, Map[String,Any]](props)
    consumer.subscribe(util.Arrays.asList("test"))
    while (true) {
      val record = consumer.poll(1000).asScala
      for (data <- record.iterator){
        println(data.value())

      }

The Input Json I am using is the below

{

"id":1,

"Name":"foo"

}

Now the Problem I am facing is each time I run the program I am getting the duplicated values.For example If I run the code twice the consumer output looks like this

{

"id":1,

"Name":"foo"

}

{

"id":1,

"Name":"foo"

}

I want the output like if I run the program the only message that is processed by producer should be consume and should be printed.

I hv tried few things like changing the consumer properties for offset to latest

props.put("auto.offset.reset", "latest")

I also tried things mentioned like below but it didnot work for me How can I get the LATEST offset of a kafka topic?

Can you please suggest any alternatives??


Solution

  • Consumer read messages from a topic partition on sequential order. If you call to poll(), it returns records written to Kafka that consumers in our group have not read yet. Kafka tracks their consumption offset on each partition to know where to start to consume in case of restart. Consumers maintain their partition offset in topic __consumer_offsets by using commit.

    Commit is the action of updating the current position in __consumer_offsets.

    If a consumer restarted, In order to know where to start to consume, the consumer will read the latest committed offset of each partition and continue from there.

    You can control the commit by two ways either set auto-commit true with commit interval

    1.By enable.auto.commit true

    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    

    2.Manual commit

    consumer.commitAsync();//asyn commit
    or  
    consumer.commitSync();//sync commit
    

    If you fail to commit it will restart from the last committed position as shown on below pics

    enter image description here

    auto.offset.reset:

    Once the consumer restarted the first time it uses auto.offset.reset to determine the initial position for each assigned partition. Please note when the group first created with a unique group id, before any messages have been consumed, the position is set according to a configurable offset reset policy (auto.offset.reset). After that, it will continue consuming message incrementally and use commit (as explained above) to track the latest consume message

    Note: If the consumer crashes before any offset has been committed, then the consumer which takes over its partitions will use the reset policy.

    So in your case

    1. Either use manual offset commit or enable.auto.commit true for auto-commit.
    2. Always use the same group id if you change group if it will treat different consumers and use auto.offset.reset to assign offset.

    Reference: https://www.confluent.io/resources/kafka-the-definitive-guide/