Search code examples
springspring-bootrabbitmqspring-cloud-streamspring-rabbit

Transaction in Spring cloud Stream


Problem: I am trying to read a big file line by line and putting the message in a RabbitMQ. I want to commit to rabbitMQ at the end of the file. If any record in the file is bad, then I want to revoke the messages published to the queue.

Technologies: Spring boot, Spring cloud stream, RabbitMQ

Could you please help me in implementing this transition stuff. I know how to read a file and publish to a queue using spring cloud stream.

Edit:

  @Transactional
  public void sendToQueue(List<Data> dataList) {

      for(Data data:dataList)
      {
          this.output.send(MessageBuilder.withPayload(data).build());
          counter++; // I can see message getting published in the queue though management plugin
      }
      LOGGER.debug("message sent to Q2");

  }

Here is my config:

spring: 
   cloud:    
      stream:
        bindings:
           # Q1 input channel
           tpi_q1_input:
            destination: TPI_Q1
            binder: local_rabbit
            content-type: application/json
            group: TPIService
            # Q2 output channel  
           tpi_q2_output:
            destination: TPI_Q2
            binder: local_rabbit
            content-type: application/json
            group: TPIService
            # Q2 input channel
           tpi_q2_input:
            destination: TPI_Q2
            binder: local_rabbit
            content-type: application/json
            group: TPIService     
        binders:
          local_rabbit:
            type: rabbit
            environment:
              spring:
                rabbitmq:
                  host: localhost
                  port: 5672
                  username: guest
                  password: guest
                  virtual-host: /
          rabbit:
            bindings:
                  tpi_q2_output:
                    producer:
                          #autoBindDlq: true
                          transacted: true
                          #batchingEnabled: true
                  tpi_q2_input:  
                   consumer:
                        acknowledgeMode: AUTO
                        #autoBindDlq: true
                        #recoveryInterval: 5000
                        transacted: true       

spring.cloud.stream.default-binder: local_rabbit

Java config

@EnableTransactionManagement
public class QueueConfig {

  @Bean
  public RabbitTransactionManager transactionManager(ConnectionFactory cf) {
    return new RabbitTransactionManager(cf);
  }
}

Receiver

@StreamListener(JmsQueueConstants.QUEUE_2_INPUT)
  @Transactional
  public void receiveMesssage(Data data) {

    logger.info("Message Received in Q2:");
  }

Q2 is the queue getting published in transation


Solution

    1. Configure the producer to use transactions ...producer.transacted=true

    2. Publish the messages within the scope of a transaction (using the RabbitTransactionManager).

    Use normal Spring transaction mechanisms for #2 (@Transacted annotation or a TransactionTemplate).

    The transaction will commit if you exit normally, or roll back if you throw an exception.

    EDIT

    Example:

    @SpringBootApplication
    @EnableBinding(Source.class)
    @EnableTransactionManagement
    public class So50372319Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So50372319Application.class, args).close();
        }
    
        @Bean
        public ApplicationRunner runner(MessageChannel output, RabbitTemplate template, AmqpAdmin admin,
                TransactionalSender sender) {
            admin.deleteQueue("so50372319.group");
            admin.declareQueue(new Queue("so50372319.group"));
            admin.declareBinding(new Binding("so50372319.group", DestinationType.QUEUE, "output", "#", null));
            return args -> {
                sender.send("foo", "bar");
                System.out.println("Received: " + new String(template.receive("so50372319.group", 10_000).getBody()));
                System.out.println("Received: " + new String(template.receive("so50372319.group", 10_000).getBody()));
                try {
                    sender.send("baz", "qux");
                }
                catch (RuntimeException e) {
                    System.out.println(e.getMessage());
                }
                System.out.println("Received: " + template.receive("so50372319.group", 3_000));
            };
        }
    
        @Bean
        public RabbitTransactionManager transactionManager(ConnectionFactory cf) {
            return new RabbitTransactionManager(cf);
        }
    
    }
    
    @Component
    class TransactionalSender {
    
        private final MessageChannel output;
    
        public TransactionalSender(MessageChannel output) {
            this.output = output;
        }
    
        @Transactional
        public void send(String... data) {
            for (String datum : data) {
                this.output.send(new GenericMessage<>(datum));
                if ("qux".equals(datum)) {
                    throw new RuntimeException("fail");
                }
            }
        }
    
    }
    

    and

    spring.cloud.stream.bindings.output.destination=output
    spring.cloud.stream.rabbit.bindings.output.producer.transacted=true
    

    and

    Received: foo
    Received: bar
    fail
    Received: null