Search code examples
javaspring-bootapache-kafkaspring-kafka

how to use consumerseekaware interface in order to fetch kafka message from a paticluar kafka offset


I have a scenario where I have to make use of consumerseekaware interface in order to fetch kafka message from a particular kafka offset.

Currently my kafka listener is configured like below:

public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@EventListener(ApplicationReadyEvent.class)
public void kafkaListenerContainerFactory() {

ConcurrentMessageListenerContainer<String, String> factory = new ConcurrentMessageListenerContainer<>(consumerFactory(), new ContainerProperties("mytopic"));
    
    factory.setConcurrency(3);
    factory.setAutoStartup(true);
    factory.setupMessageListener(listenKafkaMessage());
    factory.start();
}

MessageListener<String, String> listenKafkaMessage(){
  return this::consumeCustomMessage;
}

void consumeCustomMessage(ConsumerRecord<String, String> message){
  System.out.println("Message Received : "+ message.value())
}

wanted to check implementation of consumerseekaware and where can I register/use it in existing code.


Solution

  • you can force a consumer to start reading from a specific offset in a topic's partition. If you want to start reading from a specific offset every time your application starts, you would use the ConsumerSeekAware interface as follows:

    @Component
    public class MyKafkaListener implements ConsumerSeekAware {
    
        @KafkaListener(id = "myListener", topics = "mytopic")
        public void listen(ConsumerRecord<String, String> record) {
            System.out.println("Message Received : "+ record.value());
        }
    
        @Override
        public void registerSeekCallback(ConsumerSeekCallback callback) {
        }
    
        @Override
        public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
            // this will run when the listener is started and assigned a partition
            // you can seek to a specific offset here
            // for example, to start from offset 123 on partition 0 of topic 'mytopic':
            callback.seek("mytopic", 0, 123);
        }
    
        @Override
        public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        }
    }
    
    

    In this example, every time the listener is started and the mytopic topic's 0th partition is assigned to it, the listener will start reading from offset 123. Remember that this will happen every time your application is started or the listener is assigned a new partition.

    Remember to ensure that the listener does not commit offsets automatically. You can set the ENABLE_AUTO_COMMIT_CONFIG property to false to prevent this:

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    

    You'll also need to ensure that the AckMode of the listener container is set to MANUAL_IMMEDIATE or MANUAL, so that your listener code has to manually acknowledge that it has processed a message:

    ContainerProperties containerProperties = new ContainerProperties("mytopic");
    containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);
    

    Finally, I'd like to note that the example above is quite basic. In a real world scenario, you may need to store the offsets your consumer has processed so that you can start from the right position if your application is restarted. Please adjust the implementation according to your needs.

    EDIT based on Gary Russell's suggestions:

    1. Extending AbstractConsumerSeekAware: Instead of directly implementing ConsumerSeekAware, consider extending AbstractConsumerSeekAware. This class provides some convenient default methods, making your implementation cleaner.

    Change your class definition to:

    @Component
    public class MyKafkaListener extends AbstractConsumerSeekAware {
        // ... rest of your methods
    }
    
    1. Spring Bean Declaration: Ensure that your ConsumerFactory and ConcurrentMessageListenerContainer are managed by the Spring container. Declare them as @Beans:
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        // ... your existing code
    }
    
    @Bean
    public ConcurrentMessageListenerContainer<String, String> kafkaListenerContainerFactory() {
        // ... your existing code
    }