Search code examples
spring-mvcjmsjms-topicactivemq-artemis

JMS 2.0 - How to receive messages from topic with shared consumers?


I am using ActiveMQ Artemis and JMS 2.0 for reading topic messages with shared consumers. I have two questions:

  1. Is there any way to use configuration with xml format.
  2. When I set the message listener on the consumer is it mandatory to use a while loop? If I don't use while (true) loop the program will terminate when topic has no messages.

SharedConsumer.java

public class SharedConsumer {
    @Resource(lookup = "java:comp/DefaultJMSConnectionFactory")
    ConnectionFactory connectionFactory;

    public String maxConnectionForJSON;

    public void readFromTopicAndSendToQueue()throws Exception{
        Context initialContext = null;
        JMSContext jmsContext = null;
        int maxConnectionCount = 0;

        maxConnectionForJSON = "30";

        if (!StringUtils.isBlank(maxConnectionForJSON)){
            try{
                maxConnectionCount = Integer.parseInt(maxConnectionForJSON);
            }catch (Exception e){
                //logging
            }
        }
        if (maxConnectionCount != 0) {
            try {
                List<JMSConsumer> jmsConsumerList = new ArrayList<>();
                initialContext = new InitialContext();

                Topic topic = (Topic) initialContext.lookup("topic/exampleTopic");

                ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory");

                jmsContext = cf.createContext("admin", "admin");

                for (int i = 0; i < maxConnectionCount; i++){
                    JMSConsumer jmsConsumer = jmsContext.createSharedDurableConsumer(topic, "ct");
                    MessageListener listener = new Listener();
                    jmsConsumer.setMessageListener(listener);

                }
                while (true) {
                    Thread.sleep(30000);
                }
            } catch (Exception e) {
                System.err.println(e.getMessage());
            } finally {
                 if (initialContext != null) {
                     initialContext.close();
                 }
                 if (jmsContext != null) {
                     jmsContext.close();
                 }
            }
        }
    }

    public static void main(final String[] args) throws Exception {
        SharedConsumer sharedConsumer = new SharedConsumer();
        sharedConsumer.readFromTopicAndSendToQueue();
    }
}

SharedConsumerListener.java

public class Listener implements MessageListener {
    public static int count = 0;

    @Override
    public void onMessage(Message message) {
        System.out.println(message.toString() + "\ncount :" + count);
        count++;
    }

}

I could use xml file for reading Queue in JMS 1.1 (ActiveMQ). I thought we could use with a config file like below in JMS 2.0 Artemis but I was wrong. Thank you so much for your help Justin Bertram.

in JMS 1.1 Configuration File

<bean id="brokerUrl" class="java.lang.String">
   <constructor-arg value="#{appProperties.queueUrl}"/>
</bean>

<amq:connectionFactory id="amqConnectionFactory" brokerURL="#brokerUrl" dispatchAsync="true"/>

<bean id="connectionFactory1" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
   <constructor-arg ref="amqConnectionFactory"/>
   <property name="maxConnections" value="#{appProperties.maxConnections}"/>
   <property name="idleTimeout" value="#{appProperties.idleTimeout}"/>
   <property name="maximumActiveSessionPerConnection" value = "10"/> 

</bean>

<bean id="jmsForQueue" class="org.springframework.jms.core.JmsTemplate">
   <constructor-arg ref="connectionFactory1"/>
</bean>

<bean id="jSONQueue" class="org.apache.activemq.command.ActiveMQQueue">
   <constructor-arg value="#{appProperties.queueName}"/>
</bean>

<task:executor id="mainExecutorForJSON" pool-size="#{appProperties.mainExecutorForJSONPoolSize}"
               queue-capacity="0" rejection-policy="CALLER_RUNS"/>

<int:channel id="jmsInChannelForJSON" >
    <int:dispatcher task-executor="mainExecutorForJSON"/>
</int:channel>

<int-jms:message-driven-channel-adapter id="jmsInForJSON" destination="jSONNrtQueue" channel="jmsInChannelForJSON"
                                        concurrent-consumers="#{appProperties.concurrentConsumerCountForJSON}" />

<int:service-activator input-channel="jmsInChannelForJSON" ref="dataServiceJMS" />

Solution

  • In short, yes it is normal to prevent the program from terminating once you set a JMS consumer's message listener.

    When you create a JMS consumer and set its message listener the JMS client implementation will create new threads in the background to listen for messages asynchronously from the thread which created the consumer and set the listener. Therefore the thread which creates the consumer and sets the listener will simply carry on. In your case you need to somehow stop the thread from exiting and terminating the application therefore you need the while loop.