I'm using spring-amqp's (latest version) rabbitTemplate.sendAndReceive(exchange, routingKey, message)
method for sending messages. RabbitTemplate is configured with Jackson2JsonMessageConverter.
If I send a malformed json, I can see that message conversion fails with org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
, as expected.
However, the sendAndReceive method doesn't abort and continues execution until a replyTimeout is hit. To compare, if there is an error in my @RabbitListener annotated method, then sendAndReceive aborts instantly and returns the exception.
Is there any way to tell spring to abort sendAndReceive in case of conversion exceptions?
2021-05-18 17:07:38.188 WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#8-1] [org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler] :: Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:294)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:271)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:251)
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:342)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:325)
at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:207)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:134)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
... 10 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('s' (code 115)): was expecting comma to separate Object entries
at [Source: (String)"{sd}"; line: 1, column: 81]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:707)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:632)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipComma(ReaderBasedJsonParser.java:2324)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextFieldName(ReaderBasedJsonParser.java:921)
at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:525)
at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:377)
at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:29)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4526)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3468)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertBytesToObject(AbstractJackson2MessageConverter.java:351)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertContent(AbstractJackson2MessageConverter.java:321)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:291)
... 18 common frames omitted
2021-05-18 17:07:38.188 WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#8-1] [org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy] :: Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'{sd}' MessageProperties [headers={centreId=0, deliveryTypeId=null, path=/somePath, privileges=[], salespointId=0, insuranceId=null, promoter=null, userName=null, parameters={}, userId=null, superCentreId=0}, correlationId=1, replyTo=amq.rabbitmq.reply-to.g1h2AA5yZXBseUA4NzA3MTY3MAAAaScAAAABYJ9TZA==.YoDsH6nkpNiLYvA4h8716g==, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=someExchange, receivedRoutingKey=someKey, deliveryTag=1, consumerTag=amq.ctag-lHGwtgImO4ohGFCAHuKkLg, consumerQueue=someQueue])
2021-05-18 17:07:38.188 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#8-1] [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer] :: Execution of Rabbit message listener failed, and the error handler threw an exception
org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1436)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1720)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1495)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
... 6 common frames omitted
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:294)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:271)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:251)
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:342)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:325)
at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:207)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:134)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
... 10 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('s' (code 115)): was expecting comma to separate Object entries
at [Source: (String)"{sd}"; line: 1, column: 81]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:707)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:632)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipComma(ReaderBasedJsonParser.java:2324)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextFieldName(ReaderBasedJsonParser.java:921)
at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:525)
at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:377)
at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:29)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4526)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3468)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertBytesToObject(AbstractJackson2MessageConverter.java:351)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertContent(AbstractJackson2MessageConverter.java:321)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:291)
... 18 common frames omitted
EDIT 1: It seems this is a bug in spring-amqp
. You can follow the status of this issue here.
EDIT 2: Fixed in spring-amqp 2.3.9
.
OK. I see what is going on. We fail on the AbstractMessageListenerContainer
which does not know yet that our MessageListener
is about a request-reply behavior. So, it silently handles an exception via its default ConditionalRejectingErrorHandler
, which, in turn, throws an AmqpRejectAndDontRequeueException
and that's it. The listener container comes back to the main loop for the next message.
You probably have to implement your own ConditionalRejectingErrorHandler
overriding its:
/**
* Called when a message with a fatal exception has an {@code x-death} header, prior
* to discarding the message. Subclasses can override this method to perform some
* action, such as sending the message to a parking queue.
* @param failed the failed message.
* @since 2.3
*/
protected void handleDiscarded(Message failed) {
To send a reply back with an error message.
Feel free to raise a GitHub issue, so we will think together how to deal with such a situation on the common level.