Search code examples
spring-bootspring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Wrapping StreamBridge send and JPA save inside a transaction


I am using Spring Boot 2.5.2 and Spring Cloud 2020.0.3. I am attempting to wrap a rest service call which saves a record to a DB using JPA (CrudRepository.save) and then uses StreamBridge to post a message to a Kafka topic using spring-cloud-stream (kafka binder). I have tried several things, but nothing seems to work quite right. I am intentionally causing a JPA issue (inserting a row that would violate a unique key constraint), but the Kafka message seems to still go out to the broker.

  1. I have configured a KafkaTransactionManager (without using ChainedKafkaTransactionManager since that is now deprecated). However, it appears to be ignored, as StreamBridge seems to create its own tx mgr internally when a transactional-id-prefix is present in the configuration.
  2. Without the transactional-id-prefix, the ProducerFactory is not transactional at all, which causes the KafkaTransactionManager instantiation to fail.
  3. I tried to completely avoid creating my own transaction manager, but this also seems to fail and go ahead and send the kafka message.

What is the proper way to configure this type of a flow such that writes to both the db and broker are atomic?

HTTP -> JPA save -> Kafka send


Solution

  • You don't need a transaction manager but you do need a transactional.id on the producer factory.

    If the send is performed within the scope of the JPA transaction (e.g. @Transactional method with the JPA TM), the kafka template will synchronize the Kafka transaction with the existing transaction and either commit it, or roll it back depending on the main transaction.

    Are you aware that, even rolled back records, are actually written to the log? You must set the consumer property isolation.level to read_committed to not receive rolled-back records; it defaults to read_uncommitted.

    EDIT

    There is a bug synchronizing producer-only transactions to an existing transaction; the send is performed in a local transaction instead.

    You can use a TransactionTemplate to start a Kafka transaction as a work-around:

    @SpringBootApplication
    public class So68460690Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So68460690Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(StreamBridge bridge, Foo foo, KafkaTransactionManager<byte[], byte[]> ktm) {
            return args -> {
                new TransactionTemplate(ktm).executeWithoutResult(
                        status -> foo.doInTx(bridge)); // or execute() to return a result
            };
        }
    
        @Bean
        KafkaTransactionManager<byte[], byte[]> binderTM(BinderFactory bf) {
            return new KafkaTransactionManager<>(((KafkaMessageChannelBinder) bf.getBinder("kafka", MessageChannel.class))
                    .getTransactionalProducerFactory());
        }
    
    }
    
    @Component
    class Foo {
    
        @Transactional
        public void doInTx(StreamBridge bridge) {
            bridge.send("ouutput", "test");
            throw new RuntimeException("testEx");
        }
    
    }
    
    spring.cloud.stream.bindings.output.destination=so68460690
    
    spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=tx.
    spring.cloud.stream.kafka.binder.configuration.acks=all
    
    
    logging.level.org.springframework.kafka=trace
    
    2021-07-27 17:31:37.923 DEBUG 55933 --- [           main] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0] beginTransaction()
    2021-07-27 17:31:37.924 DEBUG 55933 --- [           main] o.s.k.t.KafkaTransactionManager          : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0]]
    2021-07-27 17:31:37.927 DEBUG 55933 --- [           main] o.s.k.t.KafkaTransactionManager          : Initiating transaction rollback
    2021-07-27 17:31:37.928 DEBUG 55933 --- [           main] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0] abortTransaction()