Search code examples
javarabbitmqspring-integrationspring-integration-amqp

The 'sequenceNumber' header value must be an Integer. But it is Long


I'm using the RabbitMQ 3.6.10 UI to publish a message that is received by my Java application that uses Spring Integration AMQP 4.3.11. The message is a reply to an earlier message that was created using a Splitter, so it had a sequenceNumber and sequenceSize headers. I copy these headers to the reply and set them to the type Number in the RabbitMQ UI. However, on the Java side, I'm getting an exception:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Message conversion failed
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:223)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: The 'sequenceNumber' header value must be an Integer.
    at org.springframework.util.Assert.isTrue(Assert.java:92)
    at org.springframework.integration.IntegrationMessageHeaderAccessor.verifyType(IntegrationMessageHeaderAccessor.java:143)
    at org.springframework.messaging.support.MessageHeaderAccessor.setHeader(MessageHeaderAccessor.java:298)
    at org.springframework.messaging.support.MessageHeaderAccessor.copyHeaders(MessageHeaderAccessor.java:389)
    at org.springframework.integration.support.MessageBuilder.copyHeaders(MessageBuilder.java:177)
    at org.springframework.integration.support.MessageBuilder.copyHeaders(MessageBuilder.java:47)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.processMessage(AmqpInboundChannelAdapter.java:243)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:203)
    ... 14 more

I checked that the types of the sequenceNumber and sequenceSize headers on the Java side are Long, instead of Integer. There is however no option to make this difference in the RabbitMQ UI. The messages will be sent by a non-Java application, so how do I make sure the headers get recognised as Integer by Spring Integration?

When I publish the reply using a Java client and set the header values to Integer, then the consumer accepts them. So this is probably a limitation of the RabbitMQ UI not having the header types specific enough (eg. 32-bit vs. 64-bit number) or Java client being too strict about the expected value type. Can anyone confirm one or the other?


Solution

  • Add a MessagePostProcessor to the adapter's listener container...

    @Bean
    public AmqpInboundChannelAdapter adapter(ConnectionFactory cf) {
        AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer(cf));
        adapter.setOutputChannelName("someChannel");
        return adapter;
    }
    
    @Bean
    public AbstractMessageListenerContainer listenerContainer(ConnectionFactory cf) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
        container.setQueueNames("foo");
        container.setAfterReceivePostProcessors(m -> {
            if (m.getMessageProperties().getHeaders()
                    .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER) instanceof Long) {
                Integer sequenceNumber = ((Long) m.getMessageProperties().getHeaders()
                        .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)).intValue();
                m.getMessageProperties().getHeaders().put(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER,
                        sequenceNumber);
            }
            return m;
        });
        return container;
    }
    

    Please open a JIRA Issue - we should probably be more lenient, especially if the value is < Integer.MAX_VALUE.