Search code examples
javaspring-bootspring-cloudapache-kafka-streamsspring-cloud-stream

How to get the ConsumerRecord object in StreamListener Consumer code


I wanted to enable the manual commit for my consumer and for that i have below code + configuration. Here i am trying to manually commit the offset in case signIn client throws exception and till manually comitting offet itw works fine but with this code the message which failed to process is not being consumed again so for that what i want to do is calling seek method and consume same failed offset again -

consumer.seek(newTopicPartition(atCommunityTopic,communityFeed.partition()),communityFeed.offset());

But the actual problem is here how do i get partition and offset details from. If somehow i can get ConsumerRecord object along with message then it will work.

spring.cloud.stream.kafka.bindings.atcommnity.consumer.autoCommitOffset=false

And Below is the consumer code through StreamListener

        @StreamListener(ConsumerConstants.COMMUNITY_IN)
        public void handleCommFeedConsumer(
                @Payload Account consumerRecords,
                @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer,
                @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
            

            consumerRecords.forEach(communityFeed -> {
                try{
                   
                AccountClient.signIn(
                        AccountIn.builder()
                                .Id(atCommunityEvent.getId())
                                .build());
                log.debug("Calling  Client for Id : "
                        + communityEvent.getId());
                
                }catch(RuntimeException ex){
                    log.info("");
                    //consumer.seek(new TopicPartition(communityTopic,communityFeed.partition()),communityFeed.offset());
                    return;
                }
                acknowledgment.acknowledge();
            });
        

           
        }

Solution

  • See https://docs.spring.io/spring-kafka/docs/current/reference/html/#consumer-record-metadata

    @Header(KafkaHeaders.PARTITION_ID) int partition

    @Header(KafkaHeaders.OFFSET) long offset

    IMPORTANT

    Seeking the consumer yourself might not do what you want because the container may already have other records after this one; it's best to throw an exception and the error handler will do the seeks for you.