Search code examples
jakarta-eejmsactivemq-classic

JMS setMessageListener - with JMSContext not work


I have a simple producer & consumer application using ActiveMQ Classic 6.0.1.

This is the producer:

ConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(BROKER_URL);

// Create a Connection
Connection connection = connectionFactory.createConnection("admin", "admin");
connection.start();

// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Create the Topic
Destination destination = session.createQueue(TOPIC_NAME);

// Create a MessageProducer
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

// Create a TextMessage and publish it
TextMessage message = session.createTextMessage("Hello, JMS Topic!");
producer.send(message);
System.out.println("Message sent");

// Clean up
session.close();
connection.close();

The message is indeed sent to the queue.

Now, I have two ways to consume the message, the 1st:

public class MyListener implements MessageListener {

    private static final Logger logger = Logger.getLogger(WebServer.class.getName());
    private static final String BROKER_URL = "tcp://messaging:61616";
    private static final String TOPIC_NAME = "TEST.FOO";

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);

        // Create a connection
        Connection connection = connectionFactory.createConnection("admin", "admin");
        connection.start();

        // Create a session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Create a destination (topic or queue)
        Destination destination = session.createQueue(TOPIC_NAME);

        // Create a consumer
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MyListener());

        // INFINIT LOOP
    }

    @Override
    public void onMessage(Message message) {
        System.out.println(message);
    }
}

This way the messages are indeed printed to console.

The 2nd way is not working (using JMSContext):

public class MyListener implements MessageListener{

    private static final Logger logger = Logger.getLogger(WebServer.class.getName());
    private static final String BROKER_URL = "tcp://messaging:61616";
    private static final String TOPIC_NAME = "TEST.FOO";

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);


        try(JMSContext context = connectionFactory.createContext("admin", "admin", AUTO_ACKNOWLEDGE)) {
            JMSConsumer consumer = context.createConsumer(context.createQueue(TOPIC_NAME));
            consumer.setMessageListener(new MyListener());
            context.start();
        }

        // INFINIT LOOP
    }

    @Override
    public void onMessage(Message message) {
        System.out.println(message);
    }
}

In the INFINIT LOOP code I do embedded Jetty start and thread join. I tried with Thread.sleep() as well.

The two application are similar, runs over embedded Jetty and using JPA (MariaDB/EclispeLink), Jersey JAX-RS and Weld-SE as CDI implementation.

Any idea why the 2nd way (I think it JMS spec 2.0) is not working?

As I think ActiveMQ classic 6.0.1 support JMS 2.0.


Solution

  • The problem is your use of try-with-resources, i.e.:

    try(JMSContext context = connectionFactory.createContext("admin", "admin", AUTO_ACKNOWLEDGE)) {
        JMSConsumer consumer = context.createConsumer(context.createQueue(TOPIC_NAME));
        consumer.setMessageListener(new MyListener());
        context.start();
    }
    

    As soon as this block ends the JMSContext you created is closed preventing your MessageListener implementation from actually receiving any messages. You don't have this problem in your other code because you don't use a try-with-resources block and you never explicitly close the connection.

    You need to either eliminate the try-with-resources block or put your INFINIT WAIT code inside of it.

    Also, there's no need to invoke context.start() as that's done automatically. This behavior can be configured via the setAutoStart method.

    Try something like this:

    public class MyListener implements MessageListener {
    
        private static final Logger logger = Logger.getLogger(WebServer.class.getName());
        private static final String BROKER_URL = "tcp://messaging:61616";
        private static final String TOPIC_NAME = "TEST.FOO";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
    
            try(JMSContext context = connectionFactory.createContext("admin", "admin", AUTO_ACKNOWLEDGE)) {
                JMSConsumer consumer = context.createConsumer(context.createQueue(TOPIC_NAME));
                consumer.setMessageListener(new MyListener());
                // INFINIT LOOP 
            }
        }
    
        @Override
        public void onMessage(Message message) {
            System.out.println(message);
        }
    }
    

    Lastly, it's worth noting that ActiveMQ Classic doesn't yet fully implement JMS 2. You'll have to use ActiveMQ Artemis if you want a full implementation.