Search code examples
apache-kafkakafka-consumer-apiproducer-consumerspring-kafka

How to set Kafka consumer configuration to consume messages from now?


I am new to Kakfa and learning on to produce and consume messages to and from a Kafka Topic.

I am using the Kafka configuration using @EnableKafka

@EnableKafka
@Configuration
public class ConsumerConfig implements ApplicationContextAware {

    @Value("${kafka.servers}")
    private String kafkaServerAddress;

    @Value("${kafka.ca.groupid}")
    private String groupId;


    private ApplicationContext context;

    public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {

        Map<String, Object> props = new HashMap<>();
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> binlogListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        DefaultKafkaConsumerFactory<String, Object> defaultFactory = consumerFactory();
        defaultFactory.setKeyDeserializer(new StringDeserializer());
        defaultFactory.setValueDeserializer(new JsonDeserializer(BinlogMessage.class));
        factory.setConsumerFactory(defaultFactory);
        return factory;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        context = applicationContext;
    }

}

Solution

  • Got the answer, it can be done by setting the property AUTO_OFFSET_RESET_CONFIG to latest as follows:

    public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {
    
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return new DefaultKafkaConsumerFactory<>(props);
    }