Search code examples
apache-kafkaspring-kafka

Why I lost messages when using kafka with EOS Beta


I'm using spring-kafka 2.5.5.RELEASE and I lost message while using kafka with ExactlyOnceSemantic.

Configuration

My listener is using @KafkaListener with this containerConfiguration :

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaBatchListenerContainerFactory(
                ConsumerFactory<Object, Object> kafkaConsumerFactory,
                AfterRollbackProcessor<Object, Object> afterRollbackProcessor,
                KafkaTransactionManager<Object, Object> kafkaTransactionManager,
                ConsumerRecordRecoverer myRecoverer
        ) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(kafkaConsumerFactory);
    factory.setAfterRollbackProcessor(afterRollbackProcessor);
    factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
    factory.getContainerProperties().setEosMode(ContainerProperties.EOSMode.BETA);
    factory.setBatchListener(true);
    factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>(myRecoverer));
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
    return factory;
}
@Bean
public AfterRollbackProcessor<?, ?> afterRollbackProcessor(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
    return new DefaultAfterRollbackProcessor<>(deadLetterPublishingRecoverer);
}

Properties :

spring.kafka.producer.acks = all
spring.kafka.consumer.isolation-level = read_committed
spring.kafka.consumer.group-id = ap_u2
# Use hostname to generate unique transaction id per instance
spring.kafka.producer.transaction-id-prefix = tx_ap_u2_${HOSTNAME}_

The topic has 3 partitions and I have 3 instances of my application, each instance consuming 1 partition.

Problem

I lost some messages in a read-process-write case. The output topic did not receive the processed messages.

The DLT was empty and the listener continued processing the messages.

Messages lost were associated to error logs :

Nov 2, 2020 @ 16:04:23.696
org.springframework.kafka.core.DefaultKafkaProducerFactory
commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@2a8419e]
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Nov 2, 2020 @ 16:04:23.788
org.springframework.kafka.core.DefaultKafkaProducerFactory
Error during some operation; producer removed from cache: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@2a8419e]

Nov 2, 2020 @ 16:04:23.979
org.springframework.kafka.support.LoggingProducerListener
Exception thrown when sending a message with key='XXX' and payload='XXX' to topic XXX:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Nov 2, 2020 @ 16:04:23.981
org.apache.kafka.clients.producer.internals.Sender
[Producer clientId=producer-XXXXXX.0, transactionalId=XXXXXX.0] Aborting producer batches due to fatal error
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

26 times on different messages:
Nov 2, 2020 @ 16:04:23.985 to Nov 2, 2020 @ 16:04:24.787
org.springframework.kafka.support.LoggingProducerListener
Exception thrown when sending a message with key='XXX' and payload='XXX' to topic XXX:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

3 times this log :
Nov 2, 2020 @ 16:04:24.893
org.apache.kafka.clients.NetworkClient
[Producer clientId=producer-XXXXXX.0, transactionalId=XXXXXX.0] Uncaught error in request completion:
java.lang.IllegalStateException: Should not reopen a batch which is already aborted.
    at org.apache.kafka.common.record.MemoryRecordsBuilder.reopenAndRewriteProducerState(MemoryRecordsBuilder.java:295)

6 times:
Nov 2, 2020 @ 16:04:24.890 to Nov 2, 2020 @ 16:04:24.887
org.springframework.kafka.support.LoggingProducerListener
Exception thrown when sending a message with key='XXX' and payload='XXX' to topic XXX:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Nov 2, 2020 @ 16:04:25.085
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerCons
Producer or 'group.instance.id' fenced during transaction
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Question

Why 244 messages were lost using a read-process-write transactional listener with EOS.Beta semantic ? Without any further information than the transaction was fenced.

Thanks


Solution

  • Known bug in kafka : KAFKA-9803

    Workaround implemented in spring-kafka 2.5.8.RELEASE and 2.6.3.RELEASE

    See stopContainerWhenFenced option