Search code examples
jmsactivemq-artemisjms-topicjms2

ActiveMQ Artemis consumers are sometimes not receiving messages


I'm facing a strange issue with ActiveMQ Artemis 2.32.0. I have 2 consumers which are running on two separate threads. The producer publishes a message to a JMS topic. Sometimes one of the two consumers seems to be stuck because there is no message sent to that consumer. However, this is not happening every time. Sometimes I can see both the consumers received the message and the flow ended as expected. Below are my consumers (Tommy, Harry) and producer (WeatherChannel).

Tommy.java:

public class Tommy implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(Tommy.class);
    
    public void receiveMessage() throws NamingException {
        // Create a new initial context, which loads from jndi.properties file
        Context context = new InitialContext();
        
        // Lookup an existing Destination which is a topic in our example
        Topic topic = (Topic)context.lookup("jms/test/topic"); 
        
        //Object in a try-with-resources block the close method will be called automatically at the end of the block.
        try(ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
                JMSContext jmsContext = connectionFactory.createContext()) {
            
            //Create consumer and receive String message on the fly (i.e. without need to type caste to Message etc.)
            String messageReceived = jmsContext.createConsumer(topic).receiveBody(String.class);
            logger.info("Message received by Tommy >>> {}", messageReceived);
        }
    }

//  public static void main(String[] args) throws NamingException, InterruptedException {
//      receiveMessage();
//  }

    @Override
    public void run() {
        try {
            receiveMessage();
        } catch (NamingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

Harry.java:

public class Harry implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(Harry.class);
    
    public static void receiveMessage() throws NamingException {
        // Create a new initial context, which loads from jndi.properties file
        Context context = new InitialContext();
        
        // Lookup an existing Destination which is a topic in our example
        Topic topic = (Topic)context.lookup("jms/test/topic"); 
        
        //Object in a try-with-resources block the close method will be called automatically at the end of the block.
        try(ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
                JMSContext jmsContext = connectionFactory.createContext()) {
            
            //Create consumer and receive String message on the fly (i.e. without need to type caste to Message etc.)
            String messageReceived = jmsContext.createConsumer(topic).receiveBody(String.class);
            logger.info("Message received by Harry >>> {}", messageReceived);
        }
    }

//  public static void main(String[] args) throws NamingException, InterruptedException {
//      receiveMessage();
//  }

    @Override
    public void run() {
        try {
            receiveMessage();
        } catch (NamingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

WeatherChannel.java:

public class WeatherChannel {
    private static final Logger logger = LoggerFactory.getLogger(WeatherChannel.class);
    
    public void broadcastMessage() throws NamingException, JMSException, InterruptedException {
        // Create a new initial context, which loads from jndi.properties file
        Context context = new InitialContext();
        
        // Lookup an existing Destination which is a topic in our example
        Topic topic = (Topic)context.lookup("jms/test/topic");
        
        //Object in a try-with-resources block the close method will be called automatically at the end of the block.
        try(ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
                JMSContext jmsContext = connectionFactory.createContext()) {
            
            //Create producer and send message on the fly
            jmsContext.createProducer().send(topic, "Today's weather at Kolkata is pleasant with max temp 27 C");
            logger.info("Message sent successfully by producer");
            
        }
    }
}

This is how I invoke the consumers and producer from a main method:

new Thread(new Tommy()).start();
new Thread(new Harry()).start();
new WeatherChannel().broadcastMessage();

Finally, I triggered both the consumers from their individual main methods and got rid of the threads. This time I see I don't see the issue and both the consumers are receiving the message successfully. Can someone point me out where I went wrong?


Solution

  • This is most likely a result of the consumer threads not being fully up and running by the time the producer thread send the message. A JMS Topic doesn't retain messages for consumers that are not online at the time they are sent unless there was an existing durable Topic subscription.

    Sine a Topic is a broadcast mechanism and not a Queue you need to ensure that any consumer you want to receive a given message is online prior to the send. In you simple example this could be accomplished by passing a latch or other waitable resource to objects when they are created and wait on that latch before sending. The thread could preform a latch countdown prior to calling receive but after the consumer was created.