Search code examples
javajmsactivemq-classicjms-queue

Connection.start is not needed for JMS MessageProducer but needed for MessageConsumer


A - Question

I know there is a similar question but not the same in SO.

I'm trying to understand what goes on under the hood with MessageProducer and MessageConsumer in JMS. Using the implementation of ActiveMQ, I've written a simple MessageProducer example to send a message to queue, and a MessageConsumer example to consume the message from the queue, while running ActiveMQ locally.

Connection#start method is needed for sending a Message to Queue. The exact debug point is as follows. Connection#start triggers ActiveMQSession#start method. This method is triggered when a Connection#start is called. See the following debug point at org.apache.activemq.ActiveMQSession#start;

ActiveMQ Debug Point

The problem is, Connection#start is not explicitly needed on a MessageProducer but needed on a MessageConsumer. However, for both examples, we need to clear the resources (session and connection). What I realized is, if I remove Connection#start method on producer, the code will execute, debug point won't be triggered (not even under the hood) and I see the message in the queue. But if I remove Connection#start method on consumer, the code won't execute, that's the question, why it's not needed in MessageProducer and the code executes successfully but needed on MessageConsumer? Also why even we don't use Connection#start for MessageProducer even to the fact that we need to close the connection in order to flush the resources. It seems like code smells.

I see that field started is an AtomicBoolean. I'm not an expert on concurrency and multi-threading, so, may be there is a logic someone can explain why for a MessageProducer, a Connection#start is not mandatory;

org.apache.activemq.ActiveMQSession - started field

B - Example Code for JMS MessageProducer with ActiveMQ

package com.bzdgn.jms.stackoverflow;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSSendMessageToQueue {
    
    private static final String ACTIVE_MQ_URL = "tcp://localhost:61616";

    public static void main(String[] args) throws JMSException {
        String queueName = "test_queue";
        String messageContent = "Hello StackOverflow!";
        
        // Connection Factory from ActiveMQ Implementation
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL);
        
        // Get connection from Connection Factory
        Connection connection = connectionFactory.createConnection();
        
        // Create session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // Send Message to Queue
        Queue queue = session.createQueue(queueName);
        TextMessage msg = session.createTextMessage(messageContent);
        MessageProducer messageProducer = session.createProducer(queue);
        messageProducer.send(msg);
        
        // Clear resources
        session.close();
        connection.close();
    }

}

C - Example Code for JMS MessageConsumer with ActiveMQ

package com.bzdgn.jms.stackoverflow;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSConsumeMessageFromQueue {
    
    private static final String ACTIVE_MQ_URL = "tcp://localhost:61616";

    public static void main(String[] args) throws JMSException {
        String queueName = "test_queue";
        
        // Connection Factory from ActiveMQ Implementation
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL);
        
        // Get connection from Connection Factory
        Connection connection = connectionFactory.createConnection();
        
        // Create session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // Consume Message from the Queue
        Queue queue = session.createQueue(queueName);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        
        connection.start();
        
        Message message = messageConsumer.receive(500);
        
        if ( message != null ) {
            if ( message instanceof TextMessage ) {
                TextMessage textMessage = (TextMessage) message;
                String messageContent = textMessage.getText();
                System.out.println("Message Content: " + messageContent);
            }
        } else {
            System.out.println("No message in the queue: " + queueName);
        }
        
        // Clear resources
        session.close();
        connection.close();
    }
    
}

D - Configuration And Maven Dependency

JDK version is 1.8, I'm running ActiveMQ 5.15.12 and also using the same version for the client dependency;

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.15.12</version>
</dependency>

Solution

  • The behavior here is dictated by the JMS specification. Simply put, javax.jms.Connection.start() applies to consumers not producers. It tells the broker to begin delivering messages to the consumers associated with the connection. The JavaDoc for Connection says this:

    It is typical to leave the connection in stopped mode until setup is complete (that is, until all message consumers have been created). At that point, the client calls the connection's start method, and messages begin arriving at the connection's consumers. This setup convention minimizes any client confusion that may result from asynchronous message delivery while the client is still in the process of setting itself up.

    A connection can be started immediately, and the setup can be done afterwards. Clients that do this must be prepared to handle asynchronous message delivery while they are still in the process of setting up.

    The start() method has no impact on producers. You're seeing the expected behavior.

    It's worth noting that this behavior is a bit different if you're using the simplified API which is part of JMS 2. If you use a JMSContext to create the a JMSConsumer then message delivery starts automatically. To be clear, ActiveMQ Classic doesn't fully implement JMS 2, but ActiveMQ Artemis does.