Search code examples
spring-bootapache-kafkaspring-data-jpaspring-transactions

Group send kafka message and DB update in one Transaction in SpringBoot


I need to perform several operations in one transaction

  • produce kafka message
  • update Table A
  • update Table B

I'm fine with sending message and don't update both tables (A and B). I'm not ok to produce message and update one of tables.

I'm trying to achieve my goal using @Transactional annotation


import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

 @Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
    public void handle(Event approvalEvent) {
        var entity = entityService.getLatestVersion(approvalEvent.getTransactionId());

        entityService.approve(entity.getTransactionId());
        logService.logApproval(entity);
        producer.send(approvalEvent);
    }

do I do it right?


Solution

  • The problem with the approach above that you are interacting with two distinct systems (Database and Message queue) in one transaction. The combinations of scenarios to handle when operation on one system is successful and fails in other system makes the solution complex.

    There is pattern in the microservices world to handle the exact same scenario. It is called outbox pattern.

    You can read more about it here.

    The short summary is you have an additional table in your database called outbox that contains the messages that are to be published to the message queue.

    In the DB transaction for adding\updating the entity you insert a row in the outbox table tool containing the details of operation on the entity.

    Then you asynchronously read the rows from outbox table and publish to message queue either via poling or using change data capture. See a sample implementation here using debezium.

    Your transaction code would look like this.

    @Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
    public void handle(Event approvalEvent) {
        var entity = entityService.getLatestVersion(approvalEvent.getTransactionId());
    
        entityService.approve(entity.getTransactionId());
        logService.logApproval(entity);
        //Outbox is the table containing the records to be published to MQ 
        outboxRepo.save(approvalEvent);
    }