I have a Spring Cloud Stream application that receives messages from RabbitMQ using the Rabbit Binder, update my database and send one or many messages. My application can be summarized as this demo app:
The problem is that it doesn't seem that @Transactional
works(or at least that's my impression) since if there's an exception the Database is rollbacked but messages are sent even the consumer/producer are configured by default as transacted.
Given that what I want to achieve is when an exception occurs I want the consumed messages go to DLQ after being retried the Database is rolled back and messages are not sent.
How can I achieve this?
This is the output of the demo application when I send a message my-input
exchange
2021-01-19 14:31:20.804 ERROR 59593 --- [nput.my-group-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking MyListener#process[1 args]; nested exception is java.lang.RuntimeException: MyError, failedMessage=GenericMessage [payload=byte[4], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=#, amqp_receivedExchange=my-input, amqp_deliveryTag=2, deliveryAttempt=3, amqp_consumerQueue=my-input.my-group, amqp_redelivered=false, id=006f733f-5eab-9119-347a-625570383c47, amqp_consumerTag=amq.ctag-CnT_p-IXTJqIBNNG4sGPoQ, sourceData=(Body:'[B@177259f3(byte[4])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=my-input, receivedRoutingKey=#, deliveryTag=2, consumerTag=amq.ctag-CnT_p-IXTJqIBNNG4sGPoQ, consumerQueue=my-input.my-group]), contentType=application/json, timestamp=1611063077789}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:66)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:308)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:304)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: MyError
at com.example.demo.MyListener.process(DemoApplication.kt:46)
at com.example.demo.MyListener$$FastClassBySpringCGLIB$$4381219a.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at com.example.demo.MyListener$$EnhancerBySpringCGLIB$$f4ed3689.process(<generated>)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
... 29 more
message should not be received here hello world
employee name still toto == toto
message should not be received here hello world
employee name still toto == toto
message should not be received here hello world
employee name still toto == toto
Since you are publishing the failed message to the DLQ, from a Rabbit perspective, the transaction was successful and the original message is acknowledged and removed from the queue, and the Rabbit transaction is committed.
You can't do what you want with republishToDlq
.
It will work if you use the normal DLQ mechanism (republishToDlq=false
, whereby the broker sends the original message to the DLQ) instead of republishing with the extra metadata.
If you want to republish with metadata, you could manually publish to the DLQ with a non-transactional RabbitTemplate
(so the DLQ publish doesn't get rolled back with the other publishes).
EDIT
Here is an example of how to do what you need.
A few things to note:
message_id
property (e.g. a UUID).@SpringBootApplication
@EnableBinding(Processor.class)
public class So65792643Application {
public static void main(String[] args) {
SpringApplication.run(So65792643Application.class, args);
}
@Autowired
Processor processor;
@StreamListener(Processor.INPUT)
public void in(Message<String> in) {
System.out.println(in.getPayload());
processor.output().send(new GenericMessage<>(in.getPayload().toUpperCase()));
int attempt = RetrySynchronizationManager.getContext().getRetryCount();
if (in.getPayload().equals("okAfterRetry") && attempt == 1) {
System.out.println("success");
}
else {
throw new RuntimeException();
}
}
@Bean
RepublishMessageRecoverer repub(RabbitTemplate template) {
RepublishMessageRecoverer repub =
new RepublishMessageRecoverer(template, "DLX", "rk");
return repub;
}
@Bean
Queue dlq() {
return new Queue("my-output.dlq");
}
@Bean
DirectExchange dlx() {
return new DirectExchange("DLX");
}
@Bean
Binding dlqBinding() {
return BindingBuilder.bind(dlq()).to(dlx()).with("rk");
}
@ServiceActivator(inputChannel = "my-input.group1.errors")
void errorHandler(ErrorMessage message) {
MessagingException mex = (MessagingException) message.getPayload();
throw mex;
}
@RabbitListener(queues = "my-output.dlq")
void dlqListen(Message<String> in) {
System.out.println("DLQ:" + in);
}
@RabbitListener(queues = "my-output.group2")
void outListen(String in) {
if (in.equals("OKAFTERRETRY")) {
System.out.println(in);
}
else {
System.out.println("Should not see this:" + in);
}
}
/*
* We must move retries from the binder to stateful retries in the container so that
* each retry is rolled back, to avoid multiple publishes to output.
* See max-attempts: 1 in the yaml.
* In order for stateful retry to work, inbound messages must have a unique message_id
* property.
*/
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer> customizer(RepublishMessageRecoverer repub) {
return (container, destinationName, group) -> {
if ("group1".equals(group)) {
container.setAdviceChain(RetryInterceptorBuilder.stateful()
.backOffOptions(1000, 2.0, 10000)
.maxAttempts(2)
.recoverer(recoverer(repub))
.keyGenerator(args -> {
// or generate a unique key some other way
return ((org.springframework.amqp.core.Message) args[1]).getMessageProperties()
.getMessageId();
})
.build());
}
};
}
private MethodInvocationRecoverer<?> recoverer(RepublishMessageRecoverer repub) {
return (args, cause) -> {
repub.recover(((ListenerExecutionFailedException) cause).getFailedMessage(), cause);
throw new AmqpRejectAndDontRequeueException(cause);
};
}
}
spring:
cloud:
stream:
rabbit:
default:
producer:
transacted: true
consumer:
transacted: true
requeue-rejected: true
bindings:
input:
destination: my-input
group: group1
consumer:
max-attempts: 1
output:
destination: my-output
producer:
required-groups: group2
okAfterRetry
2021-01-20 12:45:24.385 WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
okAfterRetry
success
OKAFTERRETRY
notOkAfterRetry
2021-01-20 12:45:39.336 WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
notOkAfterRetry
2021-01-20 12:45:39.339 WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
DLQ:GenericMessage [payload=notOkAfterRetry, ..., x-exception-message...