Search code examples
transactionsspring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafkaspring-cloud-function

How to enable database + kafka transaction in Spring Cloud Stream for producer-only transactions (db + kafka)?


We have an event driven distributed architecture with separate producer and consumer microservices using Spring Cloud Stream and the application needs to perform the following: In the producer, a database insert/update followed by publishing a message to Kafka. But, transactions are only working for the database, and not working for kafka. The DB transaction gets rolled back on error, but the kafka message still gets sent and read by the consuming microservice.

Versions used: spring-kafka 2.8.11, spring-boot 2.7.7, spring-cloud version 2021.0.5

For enabling transactions, @EnableTransactionManagement annotation is used on the spring boot application class. For producer only transaction, I have tried to use @Transactional and some other alternatives found in the documentation, but none of them are working. When testing the transaction, I manually throw a RuntimeException after the kafka message is sent in the code.

Sample code (Producer only transaction needed):-

@Autowired
private final StreamBridge streamBridge;

@Transactional
public void sendDbAndKafkaUpdate() {
    // db write here...
    
    // publish kafka message
    sendKafkaMessage();
}

private void sendKafkaMessage() {
    streamBridge.send("topic-name", messageEvent);

    //throw a RuntimeException here.
}

The application yaml configuration for enabling producer transaction:


spring:
  cloud:
    stream:
      kafka:
        binder:
          transaction:
            transaction-id-prefix: ${kafka.unique.tx.id.per.instance}  //this is set per service instance
            producer:
              configuration:
                retries: 1
                acks: all
    
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
                schema.registry.url: ${kafka.schema.registry.url}

I have searched the documentation but it is not very clear what is the recommended approach to handle this? Reference documentation (refer the section for producer only transactions):- https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.3/reference/html/spring-cloud-stream-binder-kafka.html#kafka-transactional-binder

The documentation proposes the following code to enable producer only transaction:-

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
        @Value("${kafka.unique.tx.id.per.instance}") String txId) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionId(txId)
    return tm;
}

I have tried but this doesn't work if I manually throw a RuntimeException after publishing a message to kafka. The DB transaction gets rolled back but the kafka message is still sent (and consumed by the consuming application)

Questions

  1. What should be the binder name in case StreamBridge is used to send message to a topic. Does it refer to the apache-kafka-binder itself, which would mean null will be fine if using only that binder? Or is this related to the bindings configured in application yaml (note: no output binding is used in this case where streamBridge is used)?

  2. More importantly, how can I synchronize a producer-only transaction where a database update is followed by a kafka message published, taking into consideration the following points:-

  • The documentation referred above suggests to use a ChainedTransactionManager to synchronize transactions ("If you wish to synchronize producer-only transactions with those from some other transaction manager, use a ChainedTransactionManager.") But, note that ChainedTransactionManager has been deprecated.
  • Also, note that KafkaTemplate is not used directly in the application (considering SCS provides abstractions)

[EDIT] Solution

Instead of using isolation.level at the consumer binding or default level, define it on the kafka binder configuration level as follows:-

spring.cloud.stream.kafka.binder.configuration.isolation.level: read_committed

Note that in the documentation, the value is sometimes mentioned as "read-committed" (instead of "read_committed"), but this didn't work for me.


Solution

  • For a setup like the following you don't need to define a custom Kafka transaction manager.

    @Autowired
    private final StreamBridge streamBridge;
    
    @Transactional
    public void sendDbAndKafkaUpdate() {
        // db write here...
        
        // publish kafka message
        sendKafkaMessage();
    }
    
    private void sendKafkaMessage() {
        streamBridge.send("topic-name", messageEvent);
    
        //throw a RuntimeException here.
    }
    

    It should be transactional end-to-end. The @Transactional annotation will use the database transaction manager as the primary one (for e.g. JpaTransactionManager). I am assuming that the db txn manager is auto-configured by Spring Boot in your case. When the transaction interceptor intercepts the call, it will start a new db transaction and the method is executed under this transaction. Since you are providing the transaction-id-prefix, when the StreamBridge#send method is called, the operation will be done transactionally. The internal KafkaTemplate that the StreamBridge uses, however, synchronizes the Kafka transaction with the existing JPA transaction. Upon exiting the method, the primary transaction commits first, followed by synchronized transactions. If an exception is thrown after the Kafka send, both transactions will be rolled back.

    Are you sure that the Kafka transaction is not rolled back? How did you verify that? In your downstream consumer, did you use an isolation.level of read_committed? (spring.cloud.stream.kafka.binder.configuration.isolation.level)

    Another thing to keep in mind is that if you have an auto-configured TransactionManager in the application, you do not need to add @EnableTransactionManagement on the application, as Spring Boot already applies that.

    You don't need to use any chained transaction manager with your scenario. That is only needed if you want to change the order of transaction commits. For e.g., if you want the Kafka transaction to commit first instead of the DB one, you can use a chained TM or nest the @Transactional method calls. But, by looking at your explanation, your application does not warrant those advanced setups.

    If things still don't work, feel free to create a small sample application where we can reproduce the issue.