Search code examples
activemq-classicspring-jms

Consumer is not receiving messages from ActiveMQ


We are facing a random issue with ActiveMQ and its consumers. We observe that, few consumers are not receiving messages, even though they are connected to ActiveMQ queue. But it works fine after the consumer restart.

We have a queue named testQueue at ActiveMQ side. A consumer is trying to de-queue the messages from this queue. We are using Spring's DefaultMessageListenerContainer for this purpose. Message is being delivered to the consumer node from ActiveMQ Broker. From the tcpdump as well, it was obvious that, message is reaching the consumer node, But the actual consumer code is not able to see the message. In other words, the message seems to be stuck either in ActiveMQ consumer code or in Spring’s DefaultMessageListenerContainer.

See refer to the below fig. for more clarity on the issue. Message is reaching Consumer node, but it is not reaching the “Actual Consumer Class”, which means that the message got stuck either in AMQ consumer code or Spring DMLC.

enter image description here

Below are the details captured from ActiveMQ admin.

Queue-Name /Pending-Message-Count /Consumer-Count /Messages-Enqueued /Messages-Dequeued testQueue /9 /1 /9 /0

Below are the more details.

Connection-ID /SessionId /Selector /Enqueues /Dequeues /Dispatched /Dispatched-Queue /Prefetch ID:bearsvir52-45176-1375519181268-3:5 /1 / /9 /0 /9 /9 /250

From the second table it is obvious that, messages are being delivered to the consumer, but the consumer is not acknowledging the message. Hence the messages are stuck in Dispatched-Queue at broker side.

Few points for to your notice:

1)There is no time difference b/w Broker node and consumer node.

2)Observed the tcpdump at consumer side. We can see MessageDispatch(Openwire) packet being transferred to consumer node, But could not find the MessageAck(Openwire) for the same.

3)Sometimes it is working on a node, and sometimes it is creating problem on the same node.


Solution

  • It took lot of time to figure out the solution. There seems to be some issue with the org.apache.activemq.ActiveMQConnection.java class, in case of AMQ fail over. The connection object is not getting started at consumer side in such cases.

    Following is the fix i have added in ActiveMQConnection.java file and compiled the sources to create activemq-core-x.x.x.jar

    private final Object startMutex = new Object();
    

    added a check in createSession method

    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
        synchronized (startMutex) {
            if(!isStarted()) {
                start();
            }
        }