Search code examples
apache-kafkaspring-kafkaspring-transactions

Kafka Transaction Manager sends to Kafka Broker despite transaction rolling back


My Kafka Producer keeps sending to Kafka Broker despite transaction failing. I have a custom listener i.e. I am not using the @KafkaListener annotation. This is running on Spring-kafka 2.2.x

Any ideas why the message ends up in Kafka despite KafkaTransactionManager rolling back? Here is my setup below:

// Kafka producer sender
@Transactional(transactionManager = "kafkaTransactionManager", propagation = Propagation.REQUIRED)
public void sendToKafkaWithTransaction(final String topic, final Object payload){
    ProducerRecord<String, Object> record = new ProducerRecord(topic, key, payload);
    template.executeInTransaction(kt -> kt.send(record));
}

// RabbitMQ producer sender
@Transactional(transactionManager = "rabbitTransactionManager", propagation = Propagation.REQUIRED)
public void sendToRabbitmqWithTransaction(final String topic, final String header, final Object payload){
    template.convertAndSend(topic, header, payload);
}

// Chained Transaction Manager
@Bean(name = "chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager<Object, Object> chainedKafkaTransactionManager(
       @Qualifier(value = "transactionalKafkaProducer") ProducerFactory<String, Object> producerFactory,
       @Qualifier(value = "transactionManager") JpaTransactionManager jpaTransactionManager,
       @Qualifier(value = "rabbitTransactionManager") RabbitTransactionManager rabbitTransactionManager) {
   KafkaTransactionManager producerKtm = new KafkaTransactionManager(producerFactory);
producerKtm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
   return new ChainedKafkaTransactionManager<>(jpaTransactionManager, producerKtm, rabbitTransactionManager);
}


// Listener config
listenerFactory.getContainerProperties().setTransactionManager(chainedKafkaTransactionManager);



// Listener
@Transactional(transactionManager = "chainedKafkaTransactionManager")
public void onMessage(final ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer){
    
    try {
            RetryState retryState = new DefaultRetryState(consumerRecord.topic() + "-" + consumerRecord.partition() + "-" + consumerRecord.offset());

            retryTemplate.execute(context -> {
                saveToDb() // This rolls back
                sendToKafkaWithTransaction(topic, payload); // This message gets to Kafa, it should not.
                sendToRabbitmqWithTransaction(topic, payload);  // This rolls back
                throw new Exception("Out of Anger");
                return null;
            }, recoveryCallBack, retryState);

            acknowledgment.acknowledge();
      }
      catch (ListenerExecutionFailedException e) {
         throw e;
      }
}    

// See logs
[ consumer-0-C-1] o.s.a.r.t.RabbitTransactionManager       : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Setting JPA transaction on EntityManager [SessionImpl(104745239<open>)] rollback-only

EDIT: Adding spring boot config:

spring.kafka:
  admin:
    bootstrap-servers: ${kakfa.host}
  consumer:
    group-id: test-consumers
    client-id: test-consumers
    auto-offset-reset: latest
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    enable-auto-commit: false
    properties:
        isolation-level: read_committed
  producer:
    client-id: test-producer
    acks: all
    retries: 3
    transaction-id-prefix: test-producer-tx-
    value-serializer: org.apache.kafka.common.serialization.StringSerializer
    properties:
      enable.idempotence: true
      transactional.id: tran-id-1-
      max.in.flight.requests.per.connection: 5
      isolation-level: read_committed

Edit More Logs

[ consumer-0-C-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$DefaultCrudMethodMetadata@18061927] for key [public abstract java.lang.Object org.springframework.data.jpa.repository.JpaRepository.saveAndFlush(java.lang.Object)] from thread [consumer-0-C-1]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.kafka.core.KafkaTemplate             : Sending: ProducerRecord(topic=topic-1, partition=null)
[-27cf188e6c23-1] org.apache.kafka.clients.Metadata        : Cluster ID: r3baK471R6mIft7L_DIOIg
[ consumer-0-C-1] o.s.kafka.core.KafkaTemplate             : Sent: ProducerRecord(topic=topic-1, partition=null)
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.orm.jpa.EntityManagerHolder@16bfeffa] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@30eed725] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Found thread-bound EntityManager [SessionImpl(23309560<open>)] for JPA transaction
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jdbc.datasource.ConnectionHolder@cbfb10d] for key [HikariDataSource (HikariPool-1)] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Participating in existing transaction
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor           : Getting transaction for [com.arca.framework.messaging.services.impl.BoradcastMessageServiceImpl.sendTransactional]
[-27cf188e6c23-1] o.s.kafka.core.KafkaTemplate             : Sent ok: ProducerRecord(topic=topic-1, partition=null), metadata: topic-1-0@185
[ consumer-0-C-1] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/,4)
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@2655e199] for key [CachingConnectionFactory [channelCacheSize=25, host=localhost, port=5672, active=true rabbitConnectionFactory]] to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitTemplate$$Lambda$1237/634386320 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,4), conn: Proxy@3c964873 Shared Rabbit Connection: SimpleConnection@5d9ccad2 [delegate=amqp://[email protected]:5672/, localPort= 64338]
[ consumer-0-C-1] o.s.amqp.rabbit.core.RabbitTemplate      : Publishing message (Body:'{ }')
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@2655e199] for key [CachingConnectionFactory [channelCacheSize=25, host=localhost, port=5672, active=true rabbitConnectionFactory]] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [messaging.services.impl.RabbitMessageServiceImpl.send]
[ consumer-0-C-1] c.a.f.m.k.r.KafkaSingleDispatchReceiver  : Unable to process messages of type: [class messaging.kafka.events.acquiringtmstransaction.TmsTransactionEvent] and id: [92dccb48-2cd2-47b8-b778-8550dcd72d04]
[ consumer-0-C-1] .a.f.m.k.c.KafkaTransactionalRetryPolicy : Retry count [1] for message [{}]
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [messaging.kafka.receivers.KafkaReceiver.onMessage] after exception: exceptions.MyException: Out of anger
[ consumer-0-C-1] o.s.t.i.RuleBasedTransactionAttribute    : Applying rules to determine whether transaction should rollback on exceptions.MyException: Out of anger
[ consumer-0-C-1] o.s.t.i.RuleBasedTransactionAttribute    : Winning rollback rule is: null
[ consumer-0-C-1] o.s.t.i.RuleBasedTransactionAttribute    : No relevant rollback rule found: applying default rules
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Triggering beforeCompletion synchronization
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@2655e199] for key [CachingConnectionFactory [channelCacheSize=25, host=localhost, port=5672, active=true rabbitConnectionFactory]] from thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Initiating transaction rollback
[ consumer-0-C-1] o.s.k.core.DefaultKafkaProducerFactory   : abortTransaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@b70782d, txId=tran-id-1-acquiring-tms-transaction-consumers.pos_txn_log.0]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Triggering afterCompletion synchronization
[ consumer-0-C-1] o.s.a.r.connection.RabbitResourceHolder  : Rolling back messages to channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,4), conn: Proxy@3c964873 Shared Rabbit Connection: SimpleConnection@5d9ccad2 [delegate=amqp://[email protected]:5672/, localPort= 64338]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] from thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Resuming suspended transaction after completion of inner transaction
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Initializing transaction synchronization
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Setting JPA transaction on EntityManager [SessionImpl(23309560<open>)] rollback-only
[ consumer-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception

Solution

  • That's the way Kafka transactions work. Published records are always written to the log, followed by a marker record that indicates whether the transaction committed, or rolled back.

    To avoid seeing the rolled-back records, you have to set the consumer isolation.level property to read_committed (it is read_uncommitted by default).

    EDIT

    It's because you are starting a new transaction:

    template.executeInTransaction(kt -> kt.send(record));
    
    /**
     * Execute some arbitrary operation(s) on the operations and return the result.
     * The operations are invoked within a local transaction and do not participate
     * in a global transaction (if present).
     * @param callback the callback.
     * @param <T> the result type.
     * @return the result.
     * @since 1.1
     */
    @Nullable
    <T> T executeInTransaction(OperationsCallback<K, V, T> callback);
    

    Just call template.send() and the template will participate in the transaction started by the container.

    You can also remove the @Transactional from that method.

    EDIT2

    This works as expected for me...

    spring.kafka.producer.transaction-id-prefix=tx-
    
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.isolation-level=read-committed
    
    logging.level.org.springframework.transaction=trace
    logging.level.org.springframework.kafka.core=trace
    
    @SpringBootApplication
    @EnableTransactionManagement
    public class So66306109Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So66306109Application.class, args);
        }
    
        @Autowired
        Foo foo;
    
        @Transactional
        @KafkaListener(id = "so66306109", topics = "so66306109") // Not really needed; the container has already started it
        public void listen(String in) {
            System.out.println(in);
            this.foo.send(in.toUpperCase());
            throw new RuntimeException("test");
        }
    
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so66306109").partitions(1).replicas(1).build();
        }
    
        @Bean
        public NewTopic topic2() {
            return TopicBuilder.name("so66306109-2").partitions(1).replicas(1).build();
        }
    
    
        @KafkaListener(id = "so66306109-2", topics = "so66306109-2")
        public void listen2(String in) {
            System.out.println(in);
        }
    
    }
    
    @Component
    class Foo {
    
        @Autowired
        KafkaTemplate<String, String> template;
    
        @Transactional // Not needed - we're already in a transaction
        void send(String in) {
            this.template.send("so66306109-2", in);
        }
    
    }
    

    EDIT3

    If you cannot upgrade to a supported version, you need to disable transactions in the container, and manage it yourself in your code, within the retry execute scope.

    Here is an example.

    @SpringBootApplication
    @EnableTransactionManagement
    public class So66306109Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So66306109Application.class, args);
        }
    
        @Autowired
        Foo foo;
    
        @Autowired
        RetryTemplate template;
    
        @KafkaListener(id = "so66306109", topics = "so66306109") // Not really needed; the container has already started it
        public void listen(ConsumerRecord<String, String> in) {
            this.template.execute(context -> {
                System.out.println(in);
                this.foo.send(in);
                return null;
            }, context -> {
                System.out.println("RETRIES EXHAUSTED");
                return null;
            });
        }
    
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so66306109").partitions(1).replicas(1).build();
        }
    
        @Bean
        public NewTopic topic2() {
            return TopicBuilder.name("so66306109-2").partitions(1).replicas(1).build();
        }
    
        @KafkaListener(id = "so66306109-2", topics = "so66306109-2")
        public void listen2(String in) {
            System.out.println(in);
        }
    
        @Bean
        ChainedKafkaTransactionManager<String, String> chainedTm(KafkaTransactionManager<String, String> ktm,
                ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
    
            // transactions can't be started by the container
            factory.getContainerProperties().setTransactionManager(null);
            return new ChainedKafkaTransactionManager<>(ktm);
        }
    
        @Bean
        public RetryTemplate template() {
            return new RetryTemplate();
        }
    
    }
    
    @Component
    class Foo {
    
        @Autowired
        KafkaTemplate<String, String> template;
    
        @Autowired
        ProducerFactory<String, String> pf;
    
        @Transactional("chainedTm")
        public void send(ConsumerRecord<String,String> in) {
            // updateDB
            this.template.send(new ProducerRecord<String, String>("so66306109-2", null, null, in.value().toUpperCase()));
            this.template.sendOffsetsToTransaction(Collections.singletonMap(new TopicPartition(in.topic(), in.partition()),
                    new OffsetAndMetadata(in.offset() + 1)));
    
            // simulate a DB rollback
            KafkaResourceHolder<String, String> resource = (KafkaResourceHolder<String, String>) TransactionSynchronizationManager
                    .getResource(this.pf);
            resource.setRollbackOnly();
        }
    
    }
    

    Note; you must NOT manually acknowledge such records; instead, send the offset to the transaction before it is committed.