I am new to Kakfa and learning on to produce and consume messages to and from a Kafka Topic.
I am using the Kafka configuration using @EnableKafka
@EnableKafka
@Configuration
public class ConsumerConfig implements ApplicationContextAware {
@Value("${kafka.servers}")
private String kafkaServerAddress;
@Value("${kafka.ca.groupid}")
private String groupId;
private ApplicationContext context;
public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> binlogListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
DefaultKafkaConsumerFactory<String, Object> defaultFactory = consumerFactory();
defaultFactory.setKeyDeserializer(new StringDeserializer());
defaultFactory.setValueDeserializer(new JsonDeserializer(BinlogMessage.class));
factory.setConsumerFactory(defaultFactory);
return factory;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
context = applicationContext;
}
}
Got the answer, it can be done by setting the property AUTO_OFFSET_RESET_CONFIG to latest as follows:
public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(props);
}