I'm using spring-kafka 2.5.5.RELEASE and I lost message while using kafka with ExactlyOnceSemantic.
Configuration
My listener is using @KafkaListener
with this containerConfiguration :
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaBatchListenerContainerFactory(
ConsumerFactory<Object, Object> kafkaConsumerFactory,
AfterRollbackProcessor<Object, Object> afterRollbackProcessor,
KafkaTransactionManager<Object, Object> kafkaTransactionManager,
ConsumerRecordRecoverer myRecoverer
) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory);
factory.setAfterRollbackProcessor(afterRollbackProcessor);
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
factory.getContainerProperties().setEosMode(ContainerProperties.EOSMode.BETA);
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>(myRecoverer));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
return factory;
}
@Bean
public AfterRollbackProcessor<?, ?> afterRollbackProcessor(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultAfterRollbackProcessor<>(deadLetterPublishingRecoverer);
}
Properties :
spring.kafka.producer.acks = all
spring.kafka.consumer.isolation-level = read_committed
spring.kafka.consumer.group-id = ap_u2
# Use hostname to generate unique transaction id per instance
spring.kafka.producer.transaction-id-prefix = tx_ap_u2_${HOSTNAME}_
The topic has 3 partitions and I have 3 instances of my application, each instance consuming 1 partition.
Problem
I lost some messages in a read-process-write case. The output topic did not receive the processed messages.
The DLT was empty and the listener continued processing the messages.
Messages lost were associated to error logs :
Nov 2, 2020 @ 16:04:23.696
org.springframework.kafka.core.DefaultKafkaProducerFactory
commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@2a8419e]
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Nov 2, 2020 @ 16:04:23.788
org.springframework.kafka.core.DefaultKafkaProducerFactory
Error during some operation; producer removed from cache: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@2a8419e]
Nov 2, 2020 @ 16:04:23.979
org.springframework.kafka.support.LoggingProducerListener
Exception thrown when sending a message with key='XXX' and payload='XXX' to topic XXX:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Nov 2, 2020 @ 16:04:23.981
org.apache.kafka.clients.producer.internals.Sender
[Producer clientId=producer-XXXXXX.0, transactionalId=XXXXXX.0] Aborting producer batches due to fatal error
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
26 times on different messages:
Nov 2, 2020 @ 16:04:23.985 to Nov 2, 2020 @ 16:04:24.787
org.springframework.kafka.support.LoggingProducerListener
Exception thrown when sending a message with key='XXX' and payload='XXX' to topic XXX:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
3 times this log :
Nov 2, 2020 @ 16:04:24.893
org.apache.kafka.clients.NetworkClient
[Producer clientId=producer-XXXXXX.0, transactionalId=XXXXXX.0] Uncaught error in request completion:
java.lang.IllegalStateException: Should not reopen a batch which is already aborted.
at org.apache.kafka.common.record.MemoryRecordsBuilder.reopenAndRewriteProducerState(MemoryRecordsBuilder.java:295)
6 times:
Nov 2, 2020 @ 16:04:24.890 to Nov 2, 2020 @ 16:04:24.887
org.springframework.kafka.support.LoggingProducerListener
Exception thrown when sending a message with key='XXX' and payload='XXX' to topic XXX:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Nov 2, 2020 @ 16:04:25.085
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerCons
Producer or 'group.instance.id' fenced during transaction
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Question
Why 244 messages were lost using a read-process-write transactional listener with EOS.Beta semantic ? Without any further information than the transaction was fenced.
Thanks
Known bug in kafka : KAFKA-9803
Workaround implemented in spring-kafka 2.5.8.RELEASE and 2.6.3.RELEASE
See stopContainerWhenFenced option