Search code examples
javajmsactivemq-classicjms-topic

How to pause and resume asynchronous consumption of JMS messages


Iam building an application using activeMQ where i have a producer and a consumer. In the consumer iam using a MessageListener to asynchronously listen to messages from producer which is done using a method called onMessage(Message message). But before consuming the messages i want to perform a condition check and then consume the messages. I do not want to use synchronous consumption of message because it will be against my design.

  void initialize() throws JMSException {
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            final Session session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            final Destination destination = session.createQueue("testQ");
            this.consumer = session.createConsumer(destination);
            this.consumer.setMessageListener(this);
}

Check Condition here like detection of internet connection etc

public void onMessage(final Message message) {
        Preconditions.checkNotNull(message);

        if (!(message instanceof TextMessage)) {
            _LOG.error("The message is not of type TextMessage but of type {} so we could not process", message.getClass().getSimpleName());
            throw new IllegalArgumentException("This type '" + message.getClass().getSimpleName() + "' of message could not been handled");
        }

        try {
            final String messageType = message.getStringProperty("messageType");
            Preconditions.checkNotNull(messageType);
            _LOG.info("The MessageType is {}", messageType);

            final String msg = ((TextMessage) message).getText();
            Preconditions.checkNotNull(msg);
            _LOG.debug(msg);

            process(messageType, msg);
        } catch (final JMSException e) {
            _LOG.error("We could not read the message", e);
        }
    }

Any code example would be great.


Solution

  • It's not clear as to why you want to make checks because if there is any issue with connection, then your application would be notified of such error. You can setup an ExceptionListener for JMS Connection which will get invoked if there is an issue with connection.

    The onMessage would not be called if there is a connection issue.

    Also I would push the connection.start() method call after setting up the message listener for consumer. A connection.start() invocation indicates to messaging provider that the application is ready to receive messages.

    There is connection.stop() to pause/stop message delivery. You can issue connection.start() again to resume message delivery.


    Update based on your response

    I suggest you use a session with client acknowledge mode.

    final Session session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)
    

    Then in onMessage method, if there is no internet connection, don't acknowledge the message. The message will be delivered again if it has not expired.