I'm working on integrating Spring Integration with AWS SQS queue.
I have an issue when my method annotated with @ServiceActivator
throws an exception. It seems that in such cases the message is removed from the queue anyway. I've configured MessageDeletionPolicy
to ON_SUCCESS
in SqsMessageDrivenChannelAdapter
.
Here is my channel/adapter configuration https://github.com/sdusza1/spring-integration-sqs/blob/master/src/main/java/com/example/demo/ChannelConfig.java
I've tried doing the same using @SqsListener
annotation and messages are not deleted as expected.
I've created a mini Spring Boot app here to demonstrate this issue: https://github.com/sdusza1/spring-integration-sqs
Please help :)
Your configuration is like this:
@Bean
public MessageProducerSupport sqsMessageDrivenChannelAdapter() {
SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(amazonSqs, SQS_QUEUE_NAME);
adapter.setOutputChannel(inboundChannel());
adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);
adapter.setVisibilityTimeout(RETRY_NOTIFICATION_AFTER);
return adapter;
}
Where the inboundChannel
is like this:
@Bean
public QueueChannel inboundChannel() {
return new QueueChannel();
}
So, this is a queue, therefore async and the message from that queue is processed on a separate thread by the TaskScheduler
which polls this kind of channel according your PollerMetadata
configuration. In this case any errors in the consumer are thrown into that thread as well and don't reach the SqsMessageDrivenChannelAdapter
for expected error handling.
This technically is fully different from your @SqsListener
experience which is really called directly on the container thread, and therefore its error handling is applied.
Or you need to revise your logic how you would like to handle errors in that separate thread or just don't use a QueueChannel
just after SqsMessageDrivenChannelAdapter
and let it throw and handle errors in the underlying SQS Listener Container as it is in case of @SqsListener
.