Search code examples
spring-bootrabbitmqspring-amqp

RabbitMQ sending message in transaction


Is it possible to run below code in a transaction so if an exception is thrown in the business processing we can roll back the message we sent to the queue?

rabbitTemplate.convertAndSend("queue1", data);

//do some processing

rabbitTemplate.convertAndSend("queue2", data);

Need for this is what if something went wrong after sending message to queue1, but we're not able to send message to queue2. Or what if issue network or some other issue in sending message to queue.


Solution

  • If this code is running on a listener container thread (onMessage() or @RabbitListener) and the container and template both have setChannelTransacted(true) then the publishing (and delivery) will run in the same transaction; throwing an exception will cause everything to be rolled-back.

    If this is in some arbitrary java class (not running on a container thread), then you need to start the transaction before the method runs...

        @Transactional
        public void send(String in) {
            this.template.convertAndSend("foo", in);
            if (in.equals("foo")) {
                throw new RuntimeException("test");
            }
            this.template.convertAndSend("bar", in);
        }
    

    Here's a full Spring Boot application that demonstrates the feature...

    @SpringBootApplication
    @EnableTransactionManagement
    public class So40749877Application {
    
        public static void main(String[] args) {
            ConfigurableApplicationContext context = SpringApplication.run(So40749877Application.class, args);
            Foo foo = context.getBean(Foo.class);
            try {
                foo.send("foo");
            }
            catch (Exception e) {}
            foo.send("bar");
            RabbitTemplate template = context.getBean(RabbitTemplate.class);
            // should not get any foos...
            System.out.println(template.receiveAndConvert("foo", 10_000));
            System.out.println(template.receiveAndConvert("bar", 10_000));
            // should be null
            System.out.println(template.receiveAndConvert("foo", 0));
            RabbitAdmin admin = context.getBean(RabbitAdmin.class);
            admin.deleteQueue("foo");
            admin.deleteQueue("bar");
            context.close();
        }
    
        @Bean
        public RabbitTemplate amqpTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setChannelTransacted(true);
            return rabbitTemplate;
        }
    
        @Bean
        public Queue foo() {
            return new Queue("foo");
        }
    
        @Bean
        public Queue bar() {
            return new Queue("bar");
        }
    
        @Bean
        public Foo fooBean() {
            return new Foo();
        }
    
        @Bean
        public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
            return new RabbitTransactionManager(connectionFactory);
        }
    
        public static class Foo {
    
            @Autowired
            private RabbitTemplate template;
    
            @Transactional
            public void send(String in) {
                this.template.convertAndSend("foo", in);
                if (in.equals("foo")) {
                    throw new RuntimeException("test");
                }
                this.template.convertAndSend("bar", in);
            }
    
        }
    
    }
    

    EDIT

    Transactions on the consumer side; this does not generally apply when using Spring because it manages the transaction, but when using the client directly...

    Connection connection = cf.createConnection();
    Channel channel = connection.createChannel(true);
    channel.basicQos(1);
    channel.txSelect();
    CountDownLatch latch = new CountDownLatch(1);
    channel.basicConsume("foo", new DefaultConsumer(channel) {
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                byte[] body) throws IOException {
            System.out.println(new String(body));
    
            getChannel().txRollback(); // delivery won't be requeued; remains unacked
    
            if (envelope.isRedeliver()) {
                getChannel().basicAck(envelope.getDeliveryTag(), false);
                getChannel().txCommit(); // commit the ack so the message is removed
                getChannel().basicCancel(consumerTag);
                latch.countDown();
            }
            else { // first time, let's requeue
                getChannel().basicReject(envelope.getDeliveryTag(), true);
                getChannel().txCommit(); // commit the reject so the message will be requeued
            }
        }
    
    });
    latch.await();
    channel.close();
    connection.close();
    

    Note the the txRollback does nothing in this case; only the ack (or reject) are transactional.