Search code examples
kafka-consumer-apiapache-kafka-streamskafka-producer-api

kafka stream getting duplicate records even after exactly_once enabled


I am using kafka stream for receiving some data, I noticed that its getting more records than I sent, below are my settings at consumer

At Consumer

        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-user-process");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSettigs.getKafkaBroker());
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, kafkaSettigs.getTotalStreamTHreadCounnt());
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        props.put("isolation.level", "read_committed");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "600");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
       props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 2000);

props at producer side

Propertiesprops=newProperties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"mybootstarpservers");
props.put(ProducerConfig.CLIENT_ID_CONFIG,"clientnoveluser");

props.put(ProducerConfig.ACKS_CONFIG,"all");
props.put(ProducerConfig.RETRIES_CONFIG,3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG,1500))
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,(newGenericSerializer<MyPojo>()).getClass().getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyRandom.class);

Below is my producer code

public void producerSendData(String key, MyPojo message) throws Exception {

        final Producer<String, MyPojo s> producer =  myProducerInstance.createProducer();
        final ProducerRecord<String, MyPojo> record = new ProducerRecord<String, MyPojo>("usertopic", key,message);
        try {
            producer.send(record, new ProducerCallback());
            producer.flush();
        }
        finally {

        }

    }

I have totally 10 partitions at my topic, and my producer using Round Robin kind of partition logic and writing equally to all partitions, for testing at producer side 10 different threads writes 1000 messages each.

At consumer side sometimes I get more messages than I sent, I receive like 10867 where as I sent only 10000 messages .

I noticed that I get these duplicate where each streams reconnects with below message.

2019-07-14T00:11:06,043DEBUG[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-6]c.j.m.s.UserKafkaStreamTopology:DataatStream
key:key-29value:{"userId":"message-468","data":null,"data1":null,"data3":null}
**2019-07-14T00:11:06,043INFO[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]o.a.k.c.c.KafkaConsumer:[ConsumerclientId=streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8-restore-consumer,groupId=]Unsubscribedalltopicsorpatternsandassignedpartitions
2019-07-14T00:11:06,043INFO[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]o.a.k.s.p.i.StreamThread$RebalanceListener:stream-thread[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]partitionrevocationtook16ms.
    suspendedactivetasks:[0_6]
    suspendedstandbytasks:[]
2019-07-14T00:11:06,044INFO[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]o.a.k.c.c.i.AbstractCoordinator:[ConsumerclientId=streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8-consumer,groupId=streams-user-process](Re-)joininggroup**
2019-07-14T00:11:06,043DEBUG[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-6]c.j.m.s.UserKafkaStreamTopology:DataatStream
key:key-30value:{"userId":"message-569","data":null,"data1":null,"data3":null}

I would require help to understand why I receive more records even though I enabled exactly_once


Solution

  • Exactly once for stream processing guarantees that for each received record, its processed results will be reflected once, even under failures.

    Exactly_once in the context of Kafka is a concept that applies to "Kafka Streams", and keep in mind that Kafka Streams is designed to read from topic(s) and produce to topic(s).

    Rephrasing in the Kafka Streams world: Exactly once means that the processing of any input record is considered completed if and only if state is updated accordingly and output records are successfully produced once.

    In your specific case, it looks like your logs key:key-30value:{"userId":"message-569","data":null,"data1":null,"data3":null} are produced by peek methods of your topology.

    You should rather check into the sink topic if you can find the expected number of events.

    Because if, for any reason, your Kafka Streams app cannot publish the message to the sink topic, it sounds normal that the incoming message is consumed and processed again in order to produce an out message and then guarantee the "exactly once" contract. That's why the same message can be visible several times in your log.

    You may find more detailed information at https://www.confluent.io/blog/enabling-exactly-once-kafka-streams/