Search code examples
javaapache-camelactivemq-classicjmxsimplejmx

How to get number of messages processed by queue created with activeMQ in java?


We have created routes with apache camel ,in which we are using activeMQ to publish & subscribe messages from Queues

please find below activeMQ configuration used

<beanid="activemqT3"class="org.apache.activemq.camel.component.ActiveMQComponent">  
        <propertyname="configuration"ref="jmsConfigT3"/>
        <propertyname="transacted"value="true"/>
    </bean>
    <beanid="jmsConfigT3"class="org.apache.camel.component.jms.JmsConfiguration">
        <propertyname="cacheLevelName"value="CACHE_CONSUMER"/>
        <propertyname="connectionFactory"ref="pooledConnectionFactoryT3"/>
        <propertyname="concurrentConsumers"value="10"/>
        <propertyname="receiveTimeout"value="60000"/>
        <propertyname="transacted"value="true"/>
    </bean>
    <beanid="pooledConnectionFactoryT3"class="org.apache.activemq.pool.PooledConnectionFactory"init-method="start"destroy-method="stop">
        <propertyname="maxConnections"value="8"/>
        <propertyname="connectionFactory"ref="jmsConnectionFactoryT3"/>
    </bean>
    <beanid="jmsConnectionFactoryT3"class="org.apache.activemq.ActiveMQConnectionFactory">
        <propertyname="brokerURL"value="${com.integration.amq.brokerURL}"/>
        <propertyname="userName"value="${com.integration.activemq.username}"/>
        <propertyname="password"value="${com.integration.activemq.password}"/>
        <propertyname="prefetchPolicy"ref="prefetchPolicyT3"/>
        <propertyname="redeliveryPolicy">
<beanclass="org.apache.activemq.RedeliveryPolicy">
<propertyname="maximumRedeliveries"value="0"/>
</bean>
</property>     
    </bean>
    <beanid="prefetchPolicyT3"class="org.apache.activemq.ActiveMQPrefetchPolicy">
        <propertyname="queuePrefetch"value="0"/>
    </bean>

Now using an independent java program we want to know how many messages published/subscribed by each queue.Can you please let me know sample program/example

Tried below java code String url =

"service:jmx:rmi:///jndi/rmi://abcdefdv302.attpmerd.com:1099/jmxrmi";
    JMXConnector connector = JMXConnectorFactory.connect(new JMXServiceURL(url));
    MBeanServerConnection connection = connector.getMBeanServerConnection();
    // get queue size
    ObjectName nameConsumers = new ObjectName("org.apache.activemq:type=Broker,brokerName=abcdefdv302.attpmerd.com,destinationType=Queue,destinationName=tmp.incoming.master");
    DestinationViewMBean mbView = MBeanServerInvocationHandler.newProxyInstance(connection, nameConsumers, DestinationViewMBean.class, true);
    long queueSize = mbView.getQueueSize();
    System.out.println(queueSize);

got below exception

Exception in thread "main" java.io.IOException: Failed to retrieve RMIServer stub: javax.naming.NameNotFoundException: jmxrmi
    at javax.management.remote.rmi.RMIConnector.connect(RMIConnector.java:369)
    at javax.management.remote.JMXConnectorFactory.connect(JMXConnectorFactory.java:268)
    at javax.management.remote.JMXConnectorFactory.connect(JMXConnectorFactory.java:227)
    at amd.esb.amq.App.main(App.java:43)
Caused by: javax.naming.NameNotFoundException: jmxrmi
    at com.sun.jndi.rmi.registry.RegistryContext.lookup(RegistryContext.java:116)
    at com.sun.jndi.toolkit.url.GenericURLContext.lookup(GenericURLContext.java:203)
    at javax.naming.InitialContext.lookup(InitialContext.java:411)
    at javax.management.remote.rmi.RMIConnector.findRMIServerJNDI(RMIConnector.java:1936)
    at javax.management.remote.rmi.RMIConnector.findRMIServer(RMIConnector.java:1903)
    at javax.management.remote.rmi.RMIConnector.connect(RMIConnector.java:286)

Error after using jolokia

Exception in thread "main" org.jolokia.client.exception.J4pConnectException: Cannot connect to http://atlesbdv02.amd.com:8161/api/jolokia: Connect to atlesbdv02.amd.com:8161 [atlesbdv02.amd.com/10.177.65.102] failed: Connection refused: connect
    at org.jolokia.client.J4pClient.mapException(J4pClient.java:325)
    at org.jolokia.client.J4pClient.execute(J4pClient.java:198)
    at org.jolokia.client.J4pClient.execute(J4pClient.java:168)
    at org.jolokia.client.J4pClient.execute(J4pClient.java:117)
    at com.mycompany.camel.activemq.ActiveMQClient.getNumberOfConsumedMessages(ActiveMQClient.java:33)
    at com.mycompany.camel.activemq.ActiveMQClient.main(ActiveMQClient.java:19)
Caused by: org.apache.http.conn.HttpHostConnectException: Connect to atlesbdv02.amd.com:8161 [atlesbdv02.amd.com/10.177.65.102] failed: Connection refused: connect
    at org.apache.http.impl.conn.HttpClientConnectionOperator.connect(HttpClientConnectionOperator.java:140)
    at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:314)
    at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:363)
    at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:219)
    at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:195)
    at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:86)
    at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:108)
    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:186)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:106)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
    at org.jolokia.client.J4pClient.execute(J4pClient.java:190)
    ... 4 more
Caused by: java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
    at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:579)
    at org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:72)
    at org.apache.http.impl.conn.HttpClientConnectionOperator.connect(HttpClientConnectionOperator.java:117)
    ... 15 more

Solution

  • We can use jconsole - java mangement console of java to get all the attributes of queue including number of messages processed. refer below link for more details on jconsole

    http://activemq.apache.org/jmx.html

    https://docs.oracle.com/javase/7/docs/technotes/guides/management/jconsole.html

    Use below code to get queue count using jconsole and jmx url

    JMXConnector connector;
    jmxURL = service:jmx:rmi:///jndi/rmi://hostname:1099/karaf-root;
    String[] userCredentialsArray = new String[2];
    userCredentialsArray[0] = xxx;
    userCredentialsArray[1] = xxx;
    queueName=test;
    
    connector = JMXConnectorFactory.connect(new JMXServiceURL(jmxURL),credentialsMap);
    LOGGER.info("JMX connection established " + connector);
    MBeanServerConnection connection = connector.getMBeanServerConnection();
    LOGGER.info("MBeanServerConnection connection established "+ connection);
    ObjectName nameConsumers = new ObjectName("org.apache.activemq:type=Broker,brokerName=amq,destinationType=Queue,destinationName="+ queueName);
    LOGGER.info("ObjectName created " + nameConsumers);
    DestinationViewMBean mbView = MBeanServerInvocationHandler.newProxyInstance(connection, nameConsumers,DestinationViewMBean.class, true);
    System.out.println("count - "+mbView.getQueueSize() );