Search code examples
javajmswildfly-8java-ee-7hornetq

JEE7 + WildFly (HornetQ) - Pause queue from application


We are using WildFly + HornetQ as our application server and JMS message queue, and have the requirement to be able to pause/resume queues from the application. Is this possible?


Solution

  • This can be done using JMX or using the hornetq core management api.

    For the purposes of this example, wildfly 8.1.0.Final was used running the standalone-full-ha profile.

    Required Maven Dependencies:

        <dependency>
            <groupId>org.hornetq</groupId>
            <artifactId>hornetq-jms-client</artifactId>
            <version>2.4.1.Final</version>
        </dependency>
    
        <dependency>
            <groupId>org.wildfly</groupId>
            <artifactId>wildfly-jmx</artifactId>
            <version>8.1.0.Final</version>
        </dependency>
    

    Here is a test class demonstrating the use of JmsQueueControl via JMX:

    package test.jmx.hornetq;
    
    import org.hornetq.api.jms.management.JMSQueueControl;
    
    import javax.management.*;
    import javax.management.remote.JMXConnector;
    import javax.management.remote.JMXConnectorFactory;
    import javax.management.remote.JMXServiceURL;
    
    public class WildflyJmsControl {
    
        public static void main(String[] args) {
            try {
                //Get a connection to the WildFly 8 MBean server on localhost
                String host = "localhost";
                int port = 9990;  // management-web port
                String urlString = System.getProperty("jmx.service.url","service:jmx:http-remoting-jmx://" + host + ":" + port);
                JMXServiceURL serviceURL = new JMXServiceURL(urlString);
                JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceURL, null);
                MBeanServerConnection connection = jmxConnector.getMBeanServerConnection();
    
                String queueName = "testQueue"; // use your queue name here
    
                String mbeanObjectName = "jboss.as:subsystem=messaging,hornetq-server=default,jms-queue=" + queueName;
                ObjectName objectName = ObjectName.getInstance(mbeanObjectName);
    
                JMSQueueControl jmsQueueControl = (JMSQueueControl) MBeanServerInvocationHandler.newProxyInstance(connection, objectName, JMSQueueControl.class, false);
                assert jmsQueueControl != null;
    
                long msgCount = jmsQueueControl.countMessages(null);
    
                System.out.println(mbeanObjectName + " message count: " + msgCount);
    
                jmsQueueControl.pause();
                System.out.println("queue paused");
    
                jmsQueueControl.resume();
                System.out.println("queue resumed");
    
                jmxConnector.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    To access hornetq management via JMS use:

    package test.jms.hornetq;
    
    import org.hornetq.api.core.TransportConfiguration;
    import org.hornetq.api.core.client.*;
    import org.hornetq.api.core.management.ManagementHelper;
    import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
    
    public class HornetqService {
    
        public void testPauseResumeQueue() {
            // this class needs to run in the same jvm as the wildfly server (i.e. not a remote jvm)
            try {
                ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(
                        InVMConnectorFactory.class.getName()));
    
                ClientSession session = locator.createSessionFactory().createSession();
    
                session.start();
    
                ClientRequestor requester = new ClientRequestor(session, "jms.queue.hornetq.management");
    
                String queueName = "testQueue"; // use your queue name here
    
                // get queue message count
                ClientMessage message = session.createMessage(false);
                ManagementHelper.putAttribute(message, queueName, "messageCount");
    
                ClientMessage reply = requester.request(message);
                int count = (Integer) ManagementHelper.getResult(reply);
                System.out.println("There are " + count + " messages in exampleQueue");
    
                // pause the queue
                message = session.createMessage(false);
                ManagementHelper.putOperationInvocation(message, queueName, "pause");
    
                requester.request(message);
    
                // get queue paused
                message = session.createMessage(false);
                ManagementHelper.putAttribute(message, queueName, "paused");
                reply = requester.request(message);
                Object result =  ManagementHelper.getResult(reply);
                System.out.println("result: " + result.getClass().getName() + " : " + result.toString());
    
                // resume queue
                message = session.createMessage(false);
                ManagementHelper.putOperationInvocation(message, queueName, "resume");
                requester.request(message);
    
                // get queue paused
                message = session.createMessage(false);
                ManagementHelper.putAttribute(message, queueName, "paused");
                reply = requester.request(message);
                Object result2 =  ManagementHelper.getResult(reply);
                System.out.println("result2: " + result2.getClass().getName() + " : " + result2.toString());
    
                requester.close();
    
                session.close();
            }catch (Exception e){
                System.out.println("Error pausing queue" + e.getMessage());
            }
        }
    }