How do I increase the performance of Kafka consumer ?I have(and need) Atleast Once Kafka Consumer semantics
I have the below configuration.The processInDB() takes 2 minutes to complete .So just to process 10 messages(all in single partition) its taking 20 minutes(assuming 2 minutes per message). I can call processInDB in different thread but I can lose messages !.How can I process all 10 messages between 2 to 4 minutes window ?
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "grpid-mytopic120112141");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
ConcurrentKafkaListenerContainerFactory<String, ValidatedConsumerClass> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setErrorHandler(errorHandler());
Below is my Kafka Consumer code.
@KafkaListener(id = "foo", topics = "mytopic-3", concurrency = "6", groupId = "mytopic-1-groupid")
public void consumeFromTopic1(@Payload @Valid ValidatedConsumerClass message, ConsumerRecordMetadata c) {
dbservice.processInDB(message);
}
Using a batch listener would help - you just need to hold up the consumer thread in the listener until all the individual records have completed processing.
In the next release (2.8.0-M1 milestone released today) there is support for out-of-order manual acknowledgments where the framework defers the commits until the "gaps are filled" https://docs.spring.io/spring-kafka/docs/2.8.0-M1/reference/html/#x28-ooo-commits