Search code examples
apache-kafkaapache-kafka-streamsstream-processing

Kafka Streams : Commit doesn't happen


I am new to Kafka Streams and I am trying to experiment the behaviour of kafka streams in case of timeouts.

Here is the scenario I am testing using Processor API:

  • My kafka streaming app consumes from a kafka topic (String key,String message) and writes to a kafka topic (String key,String message)

  • I have set the Consumer Config parameter max.poll.interval.ms to 60000 ms.

  • My process method looks like this:

    public void process(String key, String value) {
        System.out.println("the key is : " +key);
        LocalDateTime start= LocalDateTime.now();
        System.out.println("startTime:" + dtf.format(start));
        if ( key.startsWith("12345678"){
            Thread.sleep(80000);
        }
        System.out.println("done sleeping");
        LocalDateTime end=LocalDateTime.now();
        System.out.println("endTime:" + dtf.format(end));
        System.out.println("Offset*****"+context.offset()+" 
            partitionId****"+context.partition()+"taskId*****"+context.taskId()+ 
            "javaThreadId*******"+ Thread.currentThread().getId()+ " 
            value****"+value);
    }
    
  • All other configurations are set to default.

  • I am trying to see how the app behaves if the processing time is more than the max.poll.interval.ms.

This is what happens: At the first attempt, it begins consuming the message from kafka topic and on calling process() it starts to sleep. After 60000 ms it again calls the process method, without throwing any exception but at this point it exits sleep in just 20000 ms printing , "done sleeping" and posts message to output topic. After this, it again begins to consume the same message from the same offset without committing. This happens in a loop.

Sample output:

the key is : 12345678

startTime:2018/07/09 07:34:25

the key is : 12345678

startTime:2018/07/09 07:35:27

done sleeping

endTime:2018/07/09 07:35:45

Offset*****224 partitionId****0taskId*****0_0javaThreadId*******12 value****abc

the key is : 12345678

startTime:2018/07/09 07:36:27

done sleeping

endTime:2018/07/09 07:36:47

Offset*****224 partitionId****0taskId*****0_0javaThreadId*******14 value****abc

the key is : 12345678

startTime:2018/07/09 07:37:27

done sleeping

endTime:2018/07/09 07:37:47

Offset*****224 partitionId****0taskId*****0_0javaThreadId*******12 value****abc

  • I have tried to explicitly call context#commit() but it doesn't work either. What am I missing here? Does kafka streams remember the previous processing state? If not why does it say "done sleeping" exactly after 20000 ms after the first attempt ( mall.poll.interval.ms- 60000 , processing time (sleep) set to 80000 ms ) ?

Additional info:

  • My input and output topics have 2 partitions each and I have configured StreamsConfig num.streams.threads to 2.

  • I have a 3 node kafka cluster - Kafka and Kafka Streams version 1.1.0

  • I do not use punctuate method nor any complex processing anywhere.

Thanks in advance.


Solution

  • Not 100% sure, however note: If you call context#commit() you only "request" a commit and Kafka Streams tried to commit asap -- but after context#commit() returned, the commit did not happen yet...

    Also note, if your timeout is 60.000 and you sleep for 80.000 your application should be dropped out of the consumer group and thus, it's not allowed to commit any longer. There should be a WARN log message in the logs for this case.

    Hope this helps.