Search code examples
springtransactionsrabbitmqspring-amqpspring-rabbit

Message are not commited (loss) when using @TransactionalEventListener to send a message in a JPA Transaction


Background of the code:

In order to replicate a production scenario, I have created a dummy app that will basically save something in DB in a transaction, and in the same method, it publishEvent and publishEvent send a message to rabbitMQ.

Classes and usages

Transaction Starts from this method.:

@Override
    @Transactional
    public EmpDTO createEmployeeInTrans(EmpDTO empDto) {
        return createEmployee(empDto);
    }

This method saves the record in DB and also triggers publishEvent

@Override
    public EmpDTO createEmployee(EmpDTO empDTO) {
        
        EmpEntity empEntity = new EmpEntity();
        BeanUtils.copyProperties(empDTO, empEntity);

        System.out.println("<< In Transaction : "+TransactionSynchronizationManager.getCurrentTransactionName()+" >>  Saving data for employee " + empDTO.getEmpCode());

        // Record data into a database
        empEntity = empRepository.save(empEntity);  
        
        // Sending event , this will send the message.
        eventPublisher.publishEvent(new ActivityEvent(empDTO));
        
        return createResponse(empDTO, empEntity);
    }

This is ActivityEvent

import org.springframework.context.ApplicationEvent;
import com.kuldeep.rabbitMQProducer.dto.EmpDTO;
public class ActivityEvent extends ApplicationEvent {
    public ActivityEvent(EmpDTO source) {
        super(source);
    }
}

And this is TransactionalEventListener for the above Event.

    //@Transactional(propagation = Propagation.REQUIRES_NEW)
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void onActivitySave(ActivityEvent activityEvent) {
        System.out.println("Activity got event ... Sending message .. ");
        kRabbitTemplate.convertAndSend(exchange, routingkey, empDTO);       
    }

This is kRabbitTemplate is a bean config like this :

@Bean
    public RabbitTemplate kRabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate kRabbitTemplate = new RabbitTemplate(connectionFactory);
        kRabbitTemplate.setChannelTransacted(true);
        kRabbitTemplate.setMessageConverter(kJsonMessageConverter());
        return kRabbitTemplate;
    }

Problem Definition

When I am saving a record and sending a message on rabbitMQ using the above code flow, My messages are not delivered on the server means they lost.

What I understand about the transaction in AMQP is :

  1. If the template is transacted, but convertAndSend is not called from Spring/JPA Transaction then messages are committed within the template's convertAndSend method.
// this is a snippet from org.springframework.amqp.rabbit.core.RabbitTemplate.doSend()
    if (isChannelLocallyTransacted(channel)) {
            // Transacted channel created by this template -> commit.
            RabbitUtils.commitIfNecessary(channel);
        }
  1. But if the template is transacted and convertAndSend is called from Spring/JPA Transaction then this isChannelLocallyTransacted in doSend method will evaluate false and commit will be done in the method which initiated Spring/JPA Transaction.

What I found after investigating the reason for message loss in my above code.

  • Spring transaction was active when I called convertAndSend method, so it was supposed to commit the message in Spring transaction.
  • For that, RabbitTemplate binds the resources and registers the Synchronizations before sending the message in bindResourceToTransaction of org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.
    public static RabbitResourceHolder bindResourceToTransaction(RabbitResourceHolder resourceHolder,
            ConnectionFactory connectionFactory, boolean synched) {
        if (TransactionSynchronizationManager.hasResource(connectionFactory)
                || !TransactionSynchronizationManager.isActualTransactionActive() || !synched) {
            return (RabbitResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory); // NOSONAR never null
        }
        TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolder);
        resourceHolder.setSynchronizedWithTransaction(true);
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            TransactionSynchronizationManager.registerSynchronization(new RabbitResourceSynchronization(resourceHolder,
                    connectionFactory));
        }
        return resourceHolder;
    }

In my code, after resource bind, it is not able to registerSynchronization because TransactionSynchronizationManager.isSynchronizationActive()==false. and since it fails to registerSynchronization, spring commit did not happen for the rabbitMQ message as AbstractPlatformTransactionManager.triggerAfterCompletion calls RabbitMQ's commit for each synchronization.

What problem I faced because of the above issue.

  • Message was not committed in the spring transaction, so the message lost.
  • As resource was added in bindResourceToTransaction, this resource remained bind and did not let add the resource for any other message to send in the same thread.

Possible Root Cause of TransactionSynchronizationManager.isSynchronizationActive()==false

  • I found the method which starts the transaction removed the synchronization in triggerAfterCompletion of org.springframework.transaction.support.AbstractPlatformTransactionManager class. because status.isNewSynchronization() evaluated true after DB opertation (this usually not happens if I call convertAndSend without ApplicationEvent).
    private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) {
        if (status.isNewSynchronization()) {
            List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
            TransactionSynchronizationManager.clearSynchronization();
            if (!status.hasTransaction() || status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.trace("Triggering afterCompletion synchronization");
                }
                // No transaction or new transaction for the current scope ->
                // invoke the afterCompletion callbacks immediately
                invokeAfterCompletion(synchronizations, completionStatus);
            }
            else if (!synchronizations.isEmpty()) {
                // Existing transaction that we participate in, controlled outside
                // of the scope of this Spring transaction manager -> try to register
                // an afterCompletion callback with the existing (JTA) transaction.
                registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations);
            }
        }
    }

What I Did to overcome on this issue

I simply added @Transactional(propagation = Propagation.REQUIRES_NEW) along with on @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) in onActivitySave method and it worked as a new transaction was started.

What I need to know

  1. Why this status.isNewSynchronization in triggerAfterCompletion method when using ApplicationEvent?
  2. If the transaction was supposed to terminate in the parent method, why I got TransactionSynchronizationManager.isActualTransactionActive()==true in Listner class?
  3. If Actual Transaction Active, was it supposed to remove the synchronization?
  4. In bindResourceToTransaction, do spring AMQP assumed an active transaction without synchronization? if the answer is yes, why not to synchronization. init if it is not activated?
  5. If I am propagating a new transaction then I am losing the parent transaction, is there any better way to do it?

Please help me on this, it is a hot production issue, and I am not very sure about the fix I have done.


Solution

  • This is a bug; the RabbitMQ transaction code pre-dated the @TransactionalEventListener code, by many years.

    The problem is, with this configuration, we are in a quasi-transactional state, while there is indeed a transaction in process, the synchronizations are already cleared because the transaction has already committed.

    Using @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) works.

    I see you already raised an issue:

    https://github.com/spring-projects/spring-amqp/issues/1309

    In future, it's best to ask questions here, or raise an issue if you feel there is a bug. Don't do both.