Search code examples
jmsapache-camelactivemq-classichawtio

Get Hawtio to show more than 255 chars in JMS messages


I'm using Hawtio to browse my ActiveMQ queues. I'd also like to be able to edit a JMS message before resending it to another queue.

I don't see how I can edit a message in Hawtio, but that's fine, I guess this is not really legal to modify a message directly in the broker.

Instead, I though I would copy the message body and send a new message with the body modified. Now, the problem I'm facing is that I can only see the first 255 chars of the message body. How can I see the entire ActiveMQ message in hawtio? Not just the first 255 characters.


Solution

  • Hawtio uses to browse the queue the JMX interface. It calls the browse() method on the queue. Which returns the messages as CompositedData[].

    When a ActiveMQBytesMessage is converted (check class org.apache.activemq.broker.jmx.OpenTypeSupport.ByteMessageOpenTypeFactory) two fields are added BodyLength and BodyPreview. The fields return the following data.

    • BodyLength - the length of JMS message body
    • BodyPreview - the first 255 bytes of the JMS message body (the length which is hardcoded, as Claus Ibsen already said in his answer ;-) )

    Check in class org.apache.activemq.broker.jmx.OpenTypeSupport.ByteMessageOpenTypeFactory the method Map<String, Object> getFields(Object o).

    Hawtio uses the field BodyPreview to display the message, for non text messages.

    Check in Hawtio the file hawtio-web/src/main/webapp/app/activemq/js/browse.ts

    function createBodyText(message) {
        if (message.Text) {
            ...
        } else if (message.BodyPreview) {
            ...
            if (code === 1 || code === 2) {
                // bytes and text
                var len = message.BodyPreview.length;
                var lenTxt = "" + textArr.length;
                body = "bytes:\n" + bytesData + "\n\ntext:\n" + textData;
                message.textMode = "bytes (" + len + " bytes) and text (" + lenTxt + " chars)";
            } else {
                // bytes only
                var len = message.BodyPreview.length;
                body = bytesData;
                message.textMode = "bytes (" + len + " bytes)";
            }
            ...
        } else {
            message.textMode = "unsupported";
            ...
    

    If you want to change it you either have to change it in ActiveMQ or in Hawtio.

    A lengthy and verbose example to demonstrate the explanation.

    import static java.lang.System.out;
    import java.lang.management.ManagementFactory;
    import java.util.Enumeration;
    import java.util.concurrent.TimeUnit;
    import javax.jms.BytesMessage;
    import javax.jms.Connection;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.QueueBrowser;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.management.MBeanServer;
    import javax.management.MBeanServerInvocationHandler;
    import javax.management.ObjectName;
    import javax.management.openmbean.CompositeData;
    import javax.management.openmbean.CompositeType;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.broker.BrokerFactory;
    import org.apache.activemq.broker.BrokerService;
    import org.apache.activemq.broker.jmx.QueueViewMBean;
    import org.apache.activemq.command.ActiveMQBytesMessage;
    import org.apache.activemq.command.ActiveMQTextMessage;
    
    public class BodyPreviewExample {
    
        public static void main(String[] args) throws Exception {
            String password = "password";
            String user = "user";
            String queueName = "TEST_QUEUE";
            String brokerUrl = "tcp://localhost:61616";
    
            BrokerService broker = BrokerFactory.createBroker("broker:"+brokerUrl);
            broker.start();
            broker.waitUntilStarted();
    
            Connection conn = new ActiveMQConnectionFactory(brokerUrl)
                    .createConnection(user, password);
            conn.start();
    
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue producerQueue = session.createQueue(queueName);
            MessageProducer producer = session.createProducer(producerQueue);
    
            // create a dummy message
            StringBuilder sb = new StringBuilder(1000);
            for (int i = 0; i < 100; i++) {
                sb.append(">CAFEBABE<");
            }
    
            // create and send a JMSBytesMessage
            BytesMessage bytesMsg = session.createBytesMessage();
            bytesMsg.writeBytes(sb.toString().getBytes());
            producer.send(bytesMsg);
    
            // create and send a JMSTextMessage
            TextMessage textMsg = session.createTextMessage();
            textMsg.setText(sb.toString());
            producer.send(textMsg);
    
            producer.close();
    
            out.printf("%nmessage info via session browser%n");
            String format = "%-20s = %s%n";
            Queue consumerQueue = session.createQueue(queueName);
            QueueBrowser browser = session.createBrowser(consumerQueue);
            for (Enumeration p = browser.getEnumeration(); p.hasMoreElements();) {
                out.println();
                Object next = p.nextElement();
                if (next instanceof ActiveMQBytesMessage) {
                    ActiveMQBytesMessage amq = (ActiveMQBytesMessage) next;
                    out.printf(format, "JMSMessageID", amq.getJMSMessageID());
                    out.printf(format, "JMSDestination", amq.getJMSDestination());
                    out.printf(format, "JMSXMimeType", amq.getJMSXMimeType());
                    out.printf(format, "BodyLength", amq.getBodyLength());
                } else if (next instanceof ActiveMQTextMessage) {
                    ActiveMQTextMessage amq = (ActiveMQTextMessage) next;
                    out.printf(format, "JMSMessageID", amq.getJMSMessageID());
                    out.printf(format, "JMSDestination", amq.getJMSDestination());
                    out.printf(format, "JMSXMimeType", amq.getJMSXMimeType());
                    out.printf(format, "text.length", amq.getText().length());
                } else {
                    out.printf("unhandled message type: %s%n", next.getClass());
                }
            }
            session.close();
            conn.close();
    
            // access the queue via JMX
            out.printf("%nmessage info via JMX browse operation%n");
            MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
            ObjectName name = new ObjectName("org.apache.activemq:type=Broker"
                    + ",brokerName=localhost"
                    + ",destinationType=Queue"
                    + ",destinationName=" + queueName);
            QueueViewMBean queue
                    = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
                            name, QueueViewMBean.class, true);
            CompositeData[] browse = queue.browse();
            for (CompositeData compositeData : browse) {
                out.println();
                CompositeType compositeType = compositeData.getCompositeType();
                out.printf(format, "CompositeType", compositeType.getTypeName());
                out.printf(format,"JMSMessageID",compositeData.get("JMSMessageID"));
                if (compositeData.containsKey("BodyLength")) {
                    // body length of the ActiveMQBytesMessage
                    Long bodyLength = (Long) compositeData.get("BodyLength");
                    out.printf(format, "BodyLength", bodyLength);
                    // the content displayed by hawtio
                    Byte[] bodyPreview = (Byte[]) compositeData.get("BodyPreview");
                    out.printf(format, "size of BodyPreview", bodyPreview.length);
                } else if (compositeData.containsKey("Text")) {
                    String text = (String) compositeData.get("Text");
                    out.printf(format, "Text.length()", text.length());
                }
            }
            // uncomment if you want to check with e.g. JConsole
            // TimeUnit.MINUTES.sleep(5);
            broker.stop();
        }
    }
    

    example output

    message info via session browser
    
    JMSMessageID         = ID:hostname-50075-1467979678722-3:1:1:1:1
    JMSDestination       = queue://TEST_QUEUE
    JMSXMimeType         = jms/bytes-message
    BodyLength           = 1000
    
    JMSMessageID         = ID:hostname-50075-1467979678722-3:1:1:1:2
    JMSDestination       = queue://TEST_QUEUE
    JMSXMimeType         = jms/text-message
    text.length          = 1000
    
    message info via JMX browse operation
    
    CompositeType        = org.apache.activemq.command.ActiveMQBytesMessage
    JMSMessageID         = ID:hostname-50075-1467979678722-3:1:1:1:1
    BodyLength           = 1000
    size of BodyPreview  = 255
    
    CompositeType        = org.apache.activemq.command.ActiveMQTextMessage
    JMSMessageID         = ID:hostname-50075-1467979678722-3:1:1:1:2
    Text.length()        = 1000