Search code examples
javaspring-bootspring-kafka

How can setConsumerRebalanceListener get the calling ConsumerAwareRebalanceListener instance?


Maybe this is a naive question but it somhow stuck me there for some time. Please bear with me.

I have a class DataConsumer.java that implements ConsumerAwareRebalanceListener:

@Component
public class DataConsumer implements ConsumerAwareRebalanceListener {
    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // seek offsets based on a given timestamp
    }
    @KafkaListener(topics = "dataTopic", containerFactory = "kafkaListenerContainerFactory")
    receive(ConsumerRecord payload) {}
}

So in order for onPartitionsAssigned to work, i need to call setConsumerRebalanceListener in the kafkaListenerContainerFactory method which is defined in another class like this:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.getContainerProperties().setConsumerRebalanceListener(____________);
    // rest part omitted
}

My question is about this ____________ part above. What shall i put there?

In my understanding, method kafkaListenerContainerFactory is called when we initialize the @KafkaListener container in DataConsumer class, so meaning there is already an existing DataConsumer instance to hold the @kafkaLister. How can i pass that already existing DataConsumer instance to setConsumerRebalanceListener function?

All the sample code snippets I can search out are like below:

setConsumerRebalanceListener(new ConsumerRebalanceListener() {
    //override the functions
})

But isn't this creating a new instance? If I put new DataConsumer() it will lose some status in the existing instance (e.g. the timestamp to seek offsets) so this can't work.


Solution

  • You can declare DataConsumer as a @Bean (instead of using @Component) then you can inject your bean there.

    However, this is the wrong mechanism to use in this case.

    Implement ConsumerSeekAware instead - the container will automatically detect that your listener implements that and will call its onPartitionsAssigned.

    See Seeking to a Specific Offset in the documentation.