I'm using the RabbitMQ 3.6.10 UI to publish a message that is received by my Java application that uses Spring Integration AMQP 4.3.11. The message is a reply to an earlier message that was created using a Splitter, so it had a sequenceNumber
and sequenceSize
headers. I copy these headers to the reply and set them to the type Number
in the RabbitMQ UI. However, on the Java side, I'm getting an exception:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Message conversion failed
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:223)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: The 'sequenceNumber' header value must be an Integer.
at org.springframework.util.Assert.isTrue(Assert.java:92)
at org.springframework.integration.IntegrationMessageHeaderAccessor.verifyType(IntegrationMessageHeaderAccessor.java:143)
at org.springframework.messaging.support.MessageHeaderAccessor.setHeader(MessageHeaderAccessor.java:298)
at org.springframework.messaging.support.MessageHeaderAccessor.copyHeaders(MessageHeaderAccessor.java:389)
at org.springframework.integration.support.MessageBuilder.copyHeaders(MessageBuilder.java:177)
at org.springframework.integration.support.MessageBuilder.copyHeaders(MessageBuilder.java:47)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.processMessage(AmqpInboundChannelAdapter.java:243)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:203)
... 14 more
I checked that the types of the sequenceNumber
and sequenceSize
headers on the Java side are Long
, instead of Integer
. There is however no option to make this difference in the RabbitMQ UI. The messages will be sent by a non-Java application, so how do I make sure the headers get recognised as Integer by Spring Integration?
When I publish the reply using a Java client and set the header values to Integer
, then the consumer accepts them. So this is probably a limitation of the RabbitMQ UI not having the header types specific enough (eg. 32-bit vs. 64-bit number) or Java client being too strict about the expected value type. Can anyone confirm one or the other?
Add a MessagePostProcessor
to the adapter's listener container...
@Bean
public AmqpInboundChannelAdapter adapter(ConnectionFactory cf) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer(cf));
adapter.setOutputChannelName("someChannel");
return adapter;
}
@Bean
public AbstractMessageListenerContainer listenerContainer(ConnectionFactory cf) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
container.setQueueNames("foo");
container.setAfterReceivePostProcessors(m -> {
if (m.getMessageProperties().getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER) instanceof Long) {
Integer sequenceNumber = ((Long) m.getMessageProperties().getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)).intValue();
m.getMessageProperties().getHeaders().put(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER,
sequenceNumber);
}
return m;
});
return container;
}
Please open a JIRA Issue - we should probably be more lenient, especially if the value is < Integer.MAX_VALUE
.