Search code examples
spring-bootrabbitmqspring-cloud-streamspring-transactions

Spring Cloud Stream with RabbitMQ binder and Transactional consumer/producer with DB operations


I have a Spring Cloud Stream application that receives messages from RabbitMQ using the Rabbit Binder, update my database and send one or many messages. My application can be summarized as this demo app:

The problem is that it doesn't seem that @Transactional works(or at least that's my impression) since if there's an exception the Database is rollbacked but messages are sent even the consumer/producer are configured by default as transacted.

Given that what I want to achieve is when an exception occurs I want the consumed messages go to DLQ after being retried the Database is rolled back and messages are not sent.

How can I achieve this?

This is the output of the demo application when I send a message my-input exchange

2021-01-19 14:31:20.804 ERROR 59593 --- [nput.my-group-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking MyListener#process[1 args]; nested exception is java.lang.RuntimeException: MyError, failedMessage=GenericMessage [payload=byte[4], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=#, amqp_receivedExchange=my-input, amqp_deliveryTag=2, deliveryAttempt=3, amqp_consumerQueue=my-input.my-group, amqp_redelivered=false, id=006f733f-5eab-9119-347a-625570383c47, amqp_consumerTag=amq.ctag-CnT_p-IXTJqIBNNG4sGPoQ, sourceData=(Body:'[B@177259f3(byte[4])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=my-input, receivedRoutingKey=#, deliveryTag=2, consumerTag=amq.ctag-CnT_p-IXTJqIBNNG4sGPoQ, consumerQueue=my-input.my-group]), contentType=application/json, timestamp=1611063077789}]
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:66)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:308)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:304)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: MyError
    at com.example.demo.MyListener.process(DemoApplication.kt:46)
    at com.example.demo.MyListener$$FastClassBySpringCGLIB$$4381219a.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
    at com.example.demo.MyListener$$EnhancerBySpringCGLIB$$f4ed3689.process(<generated>)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
    ... 29 more

message should not be received here hello world
employee name still toto == toto
message should not be received here hello world
employee name still toto == toto
message should not be received here hello world
employee name still toto == toto


Solution

  • Since you are publishing the failed message to the DLQ, from a Rabbit perspective, the transaction was successful and the original message is acknowledged and removed from the queue, and the Rabbit transaction is committed.

    You can't do what you want with republishToDlq.

    It will work if you use the normal DLQ mechanism (republishToDlq=false, whereby the broker sends the original message to the DLQ) instead of republishing with the extra metadata.

    If you want to republish with metadata, you could manually publish to the DLQ with a non-transactional RabbitTemplate (so the DLQ publish doesn't get rolled back with the other publishes).

    EDIT

    Here is an example of how to do what you need.

    A few things to note:

    1. We have to add an error handler to rethrow the exception.
    2. We have to move retries to the listener container instead of the binder; otherwise, the retries will occur within the transaction and if retries are successful, multiple messages would be deposited on the output queue.
    3. For stateful retry to work, we must be able to uniquely identify each message; the simplest solution is to have the sender set a unique message_id property (e.g. a UUID).
    @SpringBootApplication
    @EnableBinding(Processor.class)
    public class So65792643Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So65792643Application.class, args);
        }
    
        @Autowired
        Processor processor;
    
        @StreamListener(Processor.INPUT)
        public void in(Message<String> in) {
            System.out.println(in.getPayload());
            processor.output().send(new GenericMessage<>(in.getPayload().toUpperCase()));
            int attempt = RetrySynchronizationManager.getContext().getRetryCount();
            if (in.getPayload().equals("okAfterRetry") && attempt == 1) {
                System.out.println("success");
            }
            else {
                throw new RuntimeException();
            }
        }
    
        @Bean
        RepublishMessageRecoverer repub(RabbitTemplate template) {
            RepublishMessageRecoverer repub =
                    new RepublishMessageRecoverer(template, "DLX", "rk");
            return repub;
        }
    
        @Bean
        Queue dlq() {
            return new Queue("my-output.dlq");
        }
    
        @Bean
        DirectExchange dlx() {
            return new DirectExchange("DLX");
        }
    
        @Bean
        Binding dlqBinding() {
            return BindingBuilder.bind(dlq()).to(dlx()).with("rk");
        }
    
        @ServiceActivator(inputChannel = "my-input.group1.errors")
        void errorHandler(ErrorMessage message) {
            MessagingException mex = (MessagingException) message.getPayload();
            throw mex;
        }
    
        @RabbitListener(queues = "my-output.dlq")
        void dlqListen(Message<String> in) {
            System.out.println("DLQ:" + in);
        }
    
        @RabbitListener(queues = "my-output.group2")
        void outListen(String in) {
            if (in.equals("OKAFTERRETRY")) {
                System.out.println(in);
            }
            else {
                System.out.println("Should not see this:" + in);
            }
        }
    
        /*
         * We must move retries from the binder to stateful retries in the container so that
         * each retry is rolled back, to avoid multiple publishes to output.
         * See max-attempts: 1 in the yaml.
         * In order for stateful retry to work, inbound messages must have a unique message_id
         * property.
         */
        @Bean
        ListenerContainerCustomizer<AbstractMessageListenerContainer> customizer(RepublishMessageRecoverer repub) {
            return (container, destinationName, group) -> {
                if ("group1".equals(group)) {
                    container.setAdviceChain(RetryInterceptorBuilder.stateful()
                            .backOffOptions(1000, 2.0, 10000)
                            .maxAttempts(2)
                            .recoverer(recoverer(repub))
                            .keyGenerator(args -> {
                                // or generate a unique key some other way
                                return ((org.springframework.amqp.core.Message) args[1]).getMessageProperties()
                                        .getMessageId();
                            })
                            .build());
                }
            };
        }
    
        private MethodInvocationRecoverer<?> recoverer(RepublishMessageRecoverer repub) {
            return (args, cause) -> {
                repub.recover(((ListenerExecutionFailedException) cause).getFailedMessage(), cause);
                throw new AmqpRejectAndDontRequeueException(cause);
            };
        }
    
    }
    
    spring:
      cloud:
        stream:
          rabbit:
            default:
              producer:
                transacted: true
              consumer:
                transacted: true
                requeue-rejected: true
          bindings:
            input:
              destination: my-input
              group: group1
              consumer:
                max-attempts: 1
            output:
              destination: my-output
              producer:
                required-groups: group2
    
    okAfterRetry
    2021-01-20 12:45:24.385  WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
    ...
    okAfterRetry
    success
    OKAFTERRETRY
    
    notOkAfterRetry
    2021-01-20 12:45:39.336  WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
    ...
    notOkAfterRetry
    2021-01-20 12:45:39.339  WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
    ...
    DLQ:GenericMessage [payload=notOkAfterRetry, ..., x-exception-message...