Search code examples
spring-integrationrabbitmq-exchangespring-rabbit

How to get header from RabbitMQ with spring integration


In order to use the delayed-exchange I am sending by "int:gateway" messages to RabbitMq with this method:

void send(@Payload Notification request, @Header(MessageProperties.X_DELAY) Integer delay, @Header("routingKey") String routingKey);

I can see in RabbitMQ that the headers appear correctly: x-delay: -60000

But, how can I get this header when I receive this message from RabbitMQ?

So far I am receiving the object that I was previously sending as a Json but if I try to get the header I am getting an Exception.

Sending:

integration.xml file:

<!-- Producing service -->
    <int:gateway id="gateway" default-request-channel="producingChannel" service-interface="Gateway"/>
    <!-- Producing service -->


<!-- Service => RabbitMQ (Producing) -->
    <int:chain input-channel="producingChannel">
        <int:object-to-json-transformer/>
        <int-amqp:outbound-channel-adapter exchange-name="${queuing.notifications-exchange}" routing-key-expression="headers.routingKey" mapped-request-headers="*"/>
    </int:chain>
    <!-- Service => RabbitMQ (Producing) -->

Gateway in java file:

void send(@Payload Notification request, @Header(MessageProperties.X_DELAY) Integer delay, @Header("routingKey") String routingKey);

Receiving:

integration.xml file:

<!-- RabbitMQ => Service (Consuming) -->
    <int-amqp:inbound-channel-adapter channel="consumingChannel" queue-names="${queuing.operator.queue}" concurrent-consumers="${queuing.concurrent-consumers}" prefetch-count="${queuing.prefetch-count}" mapped-request-headers="*" error-channel="errorChannel" />
    <!-- RabbitMQ => Service (Consuming) -->


<!-- Routing -->
<int:chain input-channel="consumingChannel">
    <int:json-to-object-transformer type="Notification"/>
    <int:service-activator ref="workingService" method="processNotificationFromQueue"/>
</int:chain>
<!-- Routing -->

WorkingService in java file:

public void processNotificationFromQueue(Notification notification,
            @Header(MessageProperties.X_DELAY) Integer delay) { ...
 }

The exception is thrown here:

Caused by: java.lang.IllegalArgumentException: required header not available: x-delay

Solution

  • You have to use AmqpHeaders.RECEIVED_DELAY instead.

    Since you use correct mapped-request-headers="*" the default DefaultAmqpHeaderMapper maps properly :

    Integer receivedDelay = amqpMessageProperties.getReceivedDelay();
    if (receivedDelay != null) {
        headers.put(AmqpHeaders.RECEIVED_DELAY, receivedDelay);
    }