Search code examples
rabbitmqspring-amqpspring-rabbit

RabbitMqTemplate Receive Operation in Distributed Transaction with Database


I have a need of creating a simple application that,

  1. Reads first available message from a RabbitMq Queue
  2. Save the object into a DB Table

The application is MVC type, and have an exposed http endpoint to trigger the operation by user.

All operations in the call need to participate in distributed transaction, and hence under one unit of work.

The code classes are like:-

Queue operation class:

public class ReadFromQueue {    
  pubic String readMessage(queue name) {
  try { 
    rabbitTemplate.receiveAndConvert(queue);
  } catch (AmqpException ex) {
     return "failure";
  }
}

Database operation class:

public class SavetoDatabase {
  public String saveObject(object){  
    try {                    pigRepository.saveAndFlush(objectMapper.readValue(receivedPig,Pig.class));
    } catch (Exception ex) {
      ex.printStackTrace();
     return "failure";
    }
    return "success";
   }
}

The controller class:

@RestController
public class MyController {

 @Transactional
 @RequestMapping("/read")
 public String readAndSavePigFromQueue(){
        String databaseSaveResult;

        String messageFromQueue = ReadFromQueue.readMessageFromQueue(queueName);

        // Some checks on returned message

        databaseSaveResult =  SaveToObject.savePigToDatabase(messageFromQueue);

        if ("failure" == databaseSaveResult) {
            return "Failed Transaction";
        }

        return "Successful transaction";
    }
}

Now the challenge is that, how can this complete operation be controlled transactionally on these distributed resources.

RabbitMq is said to be unspportive to distributed transactions, and hence the concern.

I want any failed save to database operation to result in message rollback to the queue, and leaving the failed message back in original state.

Any suggestions?


Solution

  • After some more search, the article - http://docs.spring.io/spring-amqp/docs/1.2.0.RELEASE/reference/html/amqp.html#d4e602 helped finding the solution, that lies in isChannelTransacted flag of RabbitMqTemplate

    One need to ensure that the flag is set to 'true' should you wish any RabbitTemplate operation to participate in transactions.

    Below bean configuration can be used to achieve the same.

        @Bean
        public ConnectionFactory connectionFactory(){
            return new CachingConnectionFactory();
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
            rabbitTemplate.setChannelTransacted(true);
            return rabbitTemplate;
        }
    

    In case anyone knows a better way to implement it, please post your replies to help us do it best way out.