Search code examples
amazon-web-servicesmultithreadingjmsamazon-sqsaws-sdk-java

AWS SQS JMS SDK MessageListener onMessage() Not Assigning Different Thread For Each Invocation


I have a requirement where I have to write a sqs consumer which consumes messages asynchronously from AWS SQS. My assumption is that JMS is multi threaded and for each invocation of MessageListener's onMessage(), it will assign a new thread to it.

SQSConnectionManager.java:

public class SQSConnectionManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(SQSConnectionManager.class);

    private SQSConnectionFactory sqsConnectionFactory;
    private SQSConnection sqsConnection;
    private Session sqsSession;

    public SQSConnectionManager() {
    }

    public void createSQSConnection(final String queueName) throws JMSException {

        LOGGER.info("Initializing sqs connection");
        sqsConnectionFactory = new SQSConnectionFactory(
            new ProviderConfiguration(),
            AmazonSQSClientBuilder.standard()
                                  .build()
        );

        sqsConnection = sqsConnectionFactory.createConnection();

        sqsSession = sqsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Queue queue = sqsSession.createQueue(queueName);

        MessageConsumer sqsConsumer = sqsSession.createConsumer(queue);

        sqsConsumer.setMessageListener(new MyCustomListener());

        sqsConnection.start();
        LOGGER.info("SQS Connection started");
    }
}

MyCustomListener.java:

public class MyCustomListener implements MessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomListener.class);

    public MyCustomListener() {}

    @Override
    public void onMessage(Message message) {
        try {
            LOGGER.info("onMessage() Thread name : {}", Thread.currentThread().getName());
            LOGGER.info("onMessage() Thread id : {}", Thread.currentThread().getId());
            LOGGER.info("Reading incoming sqs message");
            final SQSTextMessage sqsTextMessage = (SQSTextMessage) message;
            final String receivedMessage = sqsTextMessage.getText();
            LOGGER.info("Received sqs message : {}", receivedMessage);
            helper(receivedMessage);
        } catch (JMSException e) {
            LOGGER.error("Failed to read incoming sqs message : {}", e.getCause());
        }
    }

    private void helper(final String sqsMessage) {
        LOGGER.info("helper() Thread name : {}", Thread.currentThread().getName());
        LOGGER.info("helper() Thread id : {}", Thread.currentThread().getId());
        LOGGER.info("sqs message : {}", sqsMessage);
    }
}

Application.java

public class Application {

    private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
    
    public static void main(String[] args) throws Exception {
        SQSConnectionManager sqsConnectionManager = new SQSConnectionManager();
        sqsConnectionManager.createSQSConnection("test-queue");
    }
}

AWS Maven Dependency:

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-sqs-java-messaging-lib</artifactId>
    <version>1.1.0</version>
</dependency>

This Java app is deployed to AWS as elastic bean stalk application.

When I check the logs in Cloud Watch I see same thread id for both onMessage() and helper():

CloudWatch Log

Can anyone please help me understand how does JMS Listener handle threading concept? Does it ensure execution is multi-threaded?


Solution

  • You're assumption about multi-threaded support in JMS is incorrect. In particular, the Session and MessageConsumer objects are not thread-safe. Asynchronous message consumption by a MessageListener is serial (i.e. not parallel/concurrent) so you're seeing the expected behavior.

    This is described in more detail in section 2.14 of the Jakarta Messaging 3.1 specification. This explanation is provided for imposing concurrency limits on the Session:

    There are two reasons for restricting concurrent access to sessions. First, sessions are the Jakarta Messaging entity that supports transactions. It is very difficult to implement transactions that are multi-threaded. Second, sessions support asynchronous message consumption. It is important that Jakarta Messaging not require that client code used for asynchronous message consumption be capable of handling multiple, concurrent messages. In addition, if a session has been set up with multiple, asynchronous consumers, it is important that the client is not forced to handle the case where these separate consumers are concurrently executing. These restrictions make Jakarta Messaging easier to use for typical clients. More sophisticated clients can get the concurrency they desire by using multiple sessions. In the classic API and the domain-specific APIs this means using multiple session objects. In the simplified API this means using multiple JMSContext objects.

    Therefore, if you want concurrent consumption in your case then you need to create additional sessions and consumers.

    That said, you can certainly manage the concurrency yourself if you like (e.g. using the classes in java.util.concurrent). That is fine for simple use-cases involving AUTO_ACKNOWLEDGE. However, if you ever move beyond such a simple use-case to something more complex involving CLIENT_ACKNOWLEDGE or transactions then your code will get more complicated because you'll have to manage thread-safe access to the Session object to deal with acknowledgements, commits, roll-backs, etc. Creating multiple sessions and consumers is often simpler to implement and more straight-forward to maintain.