Search code examples
apache-kafkarabbitmqspring-kafkaspring-cloud-streamspring-rabbit

RabbitTransactionManager not rolling back at ChainedTransactionManager when an error occurs


I'm trying to use one transaction manager (ChainedTransactionManager) for Rabbit and Kafka, chaining RabbitTransactionManager and KafkaTransactionManager. We intend to achieve a Best effort 1-phase commit.

To test it, the transactional method throws an exception after the 2 operations (sending a message to a Rabbit exchange and publishing and event in Kafka). When running the test, the logs suggest a rollback is initiated but the message ends up in Rabbit anyway.

  • Notes:
  • We're using QPid to simulate in-memory RabbitMQ for testing (version 7.1.12)
  • We're using an in-memory Kafka for testing (spring-kafka-test)
  • Other relevant frameworks/libraries: spring-cloud-stream

Here's the method where the problem occurs:

    @Transactional
public void processMessageAndEvent() {
    Message<String> message = MessageBuilder
            .withPayload("Message to RabbitMQ")
            .build();
    outputToRabbitMQExchange.output().send(message);

    outputToKafkaTopic.output().send(
            withPayload("Message to Kafka")
                    .setHeader(KafkaHeaders.MESSAGE_KEY, "Kafka message key")
                    .build()
    );

    throw new RuntimeException("We want the previous changes to rollback");
}

Here is the main Spring-boot application configuration:

    @SpringBootApplication
**@EnableTransactionManagement**
public class MyApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }
}

Here is TransactionManager configuration:

    @Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
    return new RabbitTransactionManager(cf);
}

@Bean(name = "transactionManager")
@Primary
public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm, BinderFactory binders) {
    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka", MessageChannel.class))
            .getTransactionalProducerFactory();
    KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
    ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return new ChainedKafkaTransactionManager<>(ktm, rtm);
}

And finally, the relevant configuration in the application.yml file:

spring:
  application:
    name: my-application
  main:
    allow-bean-definition-overriding: true
  cloud:
    stream:
      bindings:
        source_outputToRabbitMQExchange:
          content-type: application/json
          destination: outputToRabbitMQExchange
          group: ${spring.application.name}
        sink_outputToKafkaTopic:
          content-type: application/json
          destination: outputToKafkaTopic
          binder: kafka
      rabbit:
        bindings:
          output_outputToRabbitMQExchange:
            producer:
              transacted: true
              routing-key-expression: headers.myKey
      kafka:
        bindings:
          sink_outputToKafkaTopic:
            producer:
              transacted: true
        binder:
          brokers: ${...kafka.hostname}
          transaction:
            transaction-id-prefix: ${CF_INSTANCE_INDEX}.${spring.application.name}.T
      default-binder: rabbit

  kafka:
    producer:
      properties:
        max.block.ms: 3000
        transaction.timeout.ms: 5000
        enable.idempotence: true
        retries: 1
        acks: all
    bootstrap-servers: ${...kafka.hostname}

When we execute the method, we can see the message is still in Rabbit despite the logs saying the transaction is to be rolled back.

Anything we could be missing or misunderstood?


Solution

  • @EnableBinding is deprecated in favor of the newer functional programming model.

    That said, I copied your code/config pretty-much as-is (transacted is not a kafka producer binding property) and it works fine for me (Boot 2.4.5, cloud 2020.0.2)...

    @SpringBootApplication
    @EnableTransactionManagement
    @EnableBinding(Bindings.class)
    public class So67297869Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So67297869Application.class, args);
        }
    
        @Bean
        public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
            return new RabbitTransactionManager(cf);
        }
    
        @Bean(name = "transactionManager")
        @Primary
        public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm, BinderFactory binders) {
            ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka",
                    MessageChannel.class))
                            .getTransactionalProducerFactory();
            KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
            ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
            return new ChainedKafkaTransactionManager<>(ktm, rtm);
        }
    
        @Bean
        public ApplicationRunner runner(Foo foo) {
            return args -> {
                foo.send("test");
            };
        }
    
    }
    
    interface Bindings {
    
        @Output("source_outputToRabbitMQExchange")
        MessageChannel rabbitOut();
    
        @Output("sink_outputToKafkaTopic")
        MessageChannel kafkaOut();
    
    }
    
    @Component
    class Foo {
    
        @Autowired
        Bindings bindings;
    
        @Transactional
        public void send(String in) {
            bindings.rabbitOut().send(MessageBuilder.withPayload(in)
                    .setHeader("myKey", "test")
                    .build());
            bindings.kafkaOut().send(MessageBuilder.withPayload(in)
                    .setHeader(KafkaHeaders.MESSAGE_KEY, "test".getBytes())
                    .build());
            throw new RuntimeException("fail");
        }
    
    }
    
    spring:
      application:
        name: my-application
      main:
        allow-bean-definition-overriding: true
      cloud:
        stream:
          bindings:
            source_outputToRabbitMQExchange:
              content-type: application/json
              destination: outputToRabbitMQExchange
              group: ${spring.application.name}
            sink_outputToKafkaTopic:
              content-type: application/json
              destination: outputToKafkaTopic
              binder: kafka
          rabbit:
            bindings:
              source_outputToRabbitMQExchange:
                producer:
                  transacted: true
                  routing-key-expression: headers.myKey
          kafka:
            binder:
              brokers: localhost:9092
              transaction:
                transaction-id-prefix: foo.${spring.application.name}.T
          default-binder: rabbit
    
      kafka:
        producer:
          properties:
            max.block.ms: 3000
            transaction.timeout.ms: 5000
            enable.idempotence: true
            retries: 1
            acks: all
        bootstrap-servers: localhost:9092
    
    logging:
      level:
        org.springframework.transaction: debug
        org.springframework.kafka: debug
        org.springframework.amqp.rabbit: debug
    
    2021-04-28 09:35:32.488 DEBUG 53253 --- [           main] o.s.a.r.t.RabbitTransactionManager       : Initiating transaction rollback
    2021-04-28 09:35:32.489 DEBUG 53253 --- [           main] o.s.a.r.connection.RabbitResourceHolder  : Rolling back messages to channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@3c770db4 Shared Rabbit Connection: SimpleConnection@1f736d00 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 63439]
    2021-04-28 09:35:32.490 DEBUG 53253 --- [           main] o.s.a.r.t.RabbitTransactionManager       : Resuming suspended transaction after completion of inner transaction
    2021-04-28 09:35:32.490 DEBUG 53253 --- [           main] o.s.k.t.KafkaTransactionManager          : Initiating transaction rollback
    2021-04-28 09:35:32.490 DEBUG 53253 --- [           main] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@38e83838] abortTransaction()
    

    And there is no message in the queue that I bound to the exchange with RK #.

    What versions are you using?

    EDIT

    And here is the equivalent app after removing the deprecations, using the functional model and StreamBridge (same yaml):

    @SpringBootApplication
    @EnableTransactionManagement
    public class So67297869Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So67297869Application.class, args);
        }
    
        @Bean
        public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
            return new RabbitTransactionManager(cf);
        }
    
        @Bean(name = "transactionManager")
        @Primary
        public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm, BinderFactory binders) {
            ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka",
                    MessageChannel.class))
                            .getTransactionalProducerFactory();
            KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
            ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
            return new ChainedKafkaTransactionManager<>(ktm, rtm);
        }
    
        @Bean
        public ApplicationRunner runner(Foo foo) {
            return args -> {
                foo.send("test");
            };
        }
    
    }
    
    @Component
    class Foo {
    
        @Autowired
        StreamBridge bridge;
    
        @Transactional
        public void send(String in) {
            bridge.send("source_outputToRabbitMQExchange", MessageBuilder.withPayload(in)
                    .setHeader("myKey", "test")
                    .build());
            bridge.send("sink_outputToKafkaTopic", MessageBuilder.withPayload(in)
                    .setHeader(KafkaHeaders.MESSAGE_KEY, "test".getBytes())
                    .build());
            throw new RuntimeException("fail");
        }
    
    }