Search code examples
spring-bootapache-kafkakafka-consumer-apispring-kafkakafka-transactions-api

Getting ProducerFencedException on producing record in Kafka listener thread


I am getting this exception when producing the message inside the kafka listener container.

javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-tx-group.topicA.1
org.apache.kafka.common.errors.ProducerFencedException: The producer has been rejected from the broker because it tried to use an old epoch with the transactionalId

My listener looks like this

@Transactional
@kafkaListener(...)
listener(topicA, message){
 process(message)
 produce(topicB, notification) // use Kafkatemplate to send the message
}

My configuration looks like this

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(KafkaTransactionManager kafkaTransactionManager) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
              new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
        return factory;
    }

    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
        DefaultKafkaProducerFactory<String, Object> factory = new 
        DefaultKafkaProducerFactory<>(props);
        factory.setTransactionIdPrefix(transactionIdPrefix);
        return factory;
    }


    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory());
        return template;
    }

    @Bean
    public KafkaTransactionManager kafkaTransactionManager() {
        KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory());
        return manager;
    }

I know when ProducerFencedException is thrown by Kafka, But what I am trying to figure out here where is the second producer with the same transaction.id.

If I set the unique transaction prefix in the Kafka template it works fine

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory());
        template.setTransactionIdPrefix(MessageFormat.format("{0}-{1}", transactionIdPrefix, UUID.randomUUID().toString()));
        return template;
    }

But I am trying to understand the exception here, from where the other producer is being started with the same transaction id which follow this pattern for listener started transactions as per spring docs group.id/topic/partition

I am just trying this locally on single application instance.


Solution

  • I found the root cause, I was creating two producer instances here

        public ProducerFactory<String, Object> producerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
            DefaultKafkaProducerFactory<String, Object> factory = new 
            DefaultKafkaProducerFactory<>(props);
            factory.setTransactionIdPrefix(transactionIdPrefix);
            return factory;
        }
    

    I was missing Bean configuration. Adding @Bean on producing factor and properly auto wiring it in template and TM fixed the issue.