Search code examples
spring-integrationspring-cloud-aws

How do I use the sqs-message-driven-channel-adapter in spring-integration-aws


EDIT: Here is a gist showing my log. It appears that there is ReceiveMessage and then a preSend on inputChannel:

https://gist.github.com/louisalexander/04e7d95835521efdd15455c98075e2ea

Apologies for being so dense, but I can't seem to figure out how to properly make use of the sqs-message-driven-channel-adapter

In my context file, I am configuring it as such:

<int-aws:sqs-message-driven-channel-adapter
    id="my-message-driven-adapter" sqs="sqs" queues="some-queue-of-mine"
    max-number-of-messages="5" visibility-timeout="200" wait-time-out="10"
    send-timeout="2000" channel="inputChannel" />

I observe that messages are properly making it into some-queue-of-mine (by removing the above bit of code and sending messages to the queue). I then restart my server, enabling the message driven adapter and I observe that all the messages are consumed from the queue, but where did they go? :-/

My expectation was that the messages would be funneled into a DirectChannel named inputChannel:

<int:channel id="inputChannel"/>

That I have a service-activator consuming from as follows:

<int:service-activator ref="myConsumer"
    method='execute' input-channel="inputChannel" output-channel="outputChannel">
    <int:request-handler-advice-chain>
        ...
    </int:request-handler-advice-chain>
</int:service-activator>

But of course, I am never seeing myConsumer get invoked. I imagine my understanding of how the MessageProducer mechanism works is inadequate. Can someone please correct my thinking by providing a trivial example of XML wiring?


Solution

  • According to the Logs the message is consumed by the handler.AbstractMessageHandler (AbstractMessageHandler.java:115) - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@493f49cd]. Although that might be a fully different story.

    SqsMessageDrivenChannelAdapter can be supplied with the errorChannel to handle downstream exceptions. By default it is only logged.

    The message sent from that adapter is like:

    return new GenericMessage<>(message.getBody(), new SqsMessageHeaders(messageHeaders));
    

    Where that message.getBody() is String. See QueueMessageUtils.createMessage().

    So, be sure that your service-activator accepts that String as a paylaod and not any other type.