Search code examples
spring-bootactivemq-classicspring-jms

Configuring sessionAcknowledgeMode in DefaultMessageListenerContainer


I have a setup where I have to read a message from a queue in an ActiveMQ broker. Once the message is read I have to do a long-running operation on the message.

Due to this long-running operation on the message I want to acknowledge the message as soon as possible so the resources on the broker are released. The plan would be to execute the following steps once a message is received:

  1. Get message from ActiveMQ
  2. Insert message into DB
  3. Acknowledge message
  4. Do some long-running operation with the message

I've read about JMS and the different acknowledge modes, so before even trying to do that I decided to set up an application where I could try the different modes to understand how they are processes, unfortunately I cannot seem to get my desired output.

Following the information in this answer https://stackoverflow.com/a/10188078 as well as https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html I thought that by using AUTO_ACKNOWLEDGE the message would be acknowledged before my listener is even called, but if I throw an exception in the listener the message is redelivered.

I've tried both with and without setting the setSessionTransacted to true, but in both cases I get the same output. The message is redelivered when an exception is thrown in the JmsListener.

Configuration of JMS

    @Bean
    public ConnectionFactory connectionFactory() {
        ConnectionFactory connectionFactory =
                new ActiveMQConnectionFactory(jmsConfig.getBrokerUrl());

        return connectionFactory;
    }

    @Bean
    public JmsTemplate jmstemplate(){
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(connectionFactory());
        //jmsTemplate.setSessionTransacted(true);
        jmsTemplate.setDefaultDestinationName( jmsConfig.getQueueIn() );
        return jmsTemplate;
    }

    @Bean
    public JmsListenerContainerFactory jmsListenerContainerFactoryxxxx(
                    ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {

        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
        //factory.setConcurrency("1");
        factory.setSessionTransacted(true);

        configurer.configure(factory, connectionFactory);
        return factory;
    }

JmsListener

@JmsListener(destination = "B1Q1", containerFactory = "jmsListenerContainerFactoryxxxx")
public  void receiveMessage(Message message) {
    try {
        TextMessage m = (TextMessage) message;
        String messageText = m.getText();

        int retryNum = message.getIntProperty("JMSXDeliveryCount");
        long s = message.getLongProperty("JMSTimestamp");

        Date d = new Date( s );

        String dbText = String.format("Retry %d. Message: %s", retryNum, messageText);

        if ( messageText.toLowerCase().contains("exception") ) {
            logger.info("Creating exception for retry: {}", retryNum);
            throw new RuntimeException();
        }
    } catch (JMSException e) {
        logger.error("Exception!!", e);
    }
}

How should I change the code so that the message is not redelivered when an exception is thrown?

Going back to my application where I would be inserting the message into the DB. How could I acknowledge the message in by JmsListener after the message is inserted in the DB but before executing the long-running task?


Solution

  • In order to be able to use AUTO_ACKNOWLEDGE or CLIENT_ACKNOWLEDGE I had to call factory.setSessionTransacted(false) after configuring the factory.

    Calling configurer.configure(factory, connectionFactory) overrides the value of sessionTransacted, in my case it was setting it to true which rendered AUTO_ACKNOWLEDGE or CLIENT_ACKNOWLEDGE ineffective. Here's the relevant code of DefaultJmsListenerContainerFactoryConfigurer.java:

    public void configure(DefaultJmsListenerContainerFactory factory, ConnectionFactory connectionFactory) {
    ...
    ...
            if (this.transactionManager != null) {
                factory.setTransactionManager(this.transactionManager);
            } else {
                factory.setSessionTransacted(true);
            }
    ...
    ...