Given a JMS / SQS configuration like this:
private final SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory(
new ProviderConfiguration().withNumberOfMessagesToPrefetch(10),
AmazonSQSClientBuilder.defaultClient());
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(this.sqsConnectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
and a receiver like this:
@Service
public class HandleMessage {
@Transactional
@JmsListener(destination = "${sqs.handler}")
public void receive(String message) throws IOException, JMSException {
...
if (message.contains("test"))
throw new JMSException("boom!");
...
}
I am finding that all the messages are being processed, and the message containing test vanishes, instead of retries. Is there perhaps something that needs to be changed in the SQS configuration.
The @Transactional
attribute may or may not be required, but what I want is for spring boot to signal to SQS the message failed in the presence of an exception, which I am sure is possible.
Maximum message size 256 KB
Last updated 5/25/2020, 12:00:10
Message retention period 4 Days
Default visibility timeout 30 Seconds
Messages available 0
Delivery delay 0 Seconds
Messages in flight (not available to other consumers) 0
Receive message wait time 0 Seconds
Messages delayed 0
Content-based deduplication -
I hope that this answers your question, or at least get you going in the right direction.
You will need to have a JTA transaction manager deployed as the Spring PlatformTransactionManager. There are many available, I have had good results with Arjuna. Adding the starter:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-narayana</artifactId>
</dependency>
will deploy Arjuna and configure it as the PlatformTransactionManager.
Helpful logging:
<logger name="com.arjuna" level="TRACE" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
Drop the:
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
and add:
factory.setSessionTransacted(true);
When the message listener container starts listening for a message, it should create a transaction context. When you get to your transactional method, that method should then join the transaction context. If the commit for the method is successful, then you should see the message receive get committed in the Arjuna logging. If there is an exception, then the listener transaction will get rolled back, and your message will be returned to your queue.
Just to muddy the waters.... You only have the one transactional resource - the JMS session. You could avoid using a full up JTA transaction manager. I personally don't think that they are overly resource intensive. You can create a JmsTransactionManager, and pass your connnection factory into it. Then, you can set the JmsTransactionManager on the message listener container factory as the transaction manager.