I'm using Hawtio to browse my ActiveMQ queues. I'd also like to be able to edit a JMS message before resending it to another queue.
I don't see how I can edit a message in Hawtio, but that's fine, I guess this is not really legal to modify a message directly in the broker.
Instead, I though I would copy the message body and send a new message with the body modified. Now, the problem I'm facing is that I can only see the first 255 chars of the message body. How can I see the entire ActiveMQ message in hawtio? Not just the first 255 characters.
Hawtio uses to browse the queue the JMX interface. It calls the browse()
method on the queue. Which returns the messages as CompositedData[]
.
When a ActiveMQBytesMessage
is converted (check class org.apache.activemq.broker.jmx.OpenTypeSupport.ByteMessageOpenTypeFactory
) two fields are added BodyLength
and BodyPreview
. The fields return the following data.
BodyLength
- the length of JMS message bodyBodyPreview
- the first 255 bytes of the JMS message body (the length which is hardcoded, as Claus Ibsen already said in his answer ;-) )Check in class org.apache.activemq.broker.jmx.OpenTypeSupport.ByteMessageOpenTypeFactory
the method Map<String, Object> getFields(Object o)
.
Hawtio uses the field BodyPreview
to display the message, for non text messages.
Check in Hawtio the file hawtio-web/src/main/webapp/app/activemq/js/browse.ts
function createBodyText(message) {
if (message.Text) {
...
} else if (message.BodyPreview) {
...
if (code === 1 || code === 2) {
// bytes and text
var len = message.BodyPreview.length;
var lenTxt = "" + textArr.length;
body = "bytes:\n" + bytesData + "\n\ntext:\n" + textData;
message.textMode = "bytes (" + len + " bytes) and text (" + lenTxt + " chars)";
} else {
// bytes only
var len = message.BodyPreview.length;
body = bytesData;
message.textMode = "bytes (" + len + " bytes)";
}
...
} else {
message.textMode = "unsupported";
...
If you want to change it you either have to change it in ActiveMQ or in Hawtio.
A lengthy and verbose example to demonstrate the explanation.
import static java.lang.System.out;
import java.lang.management.ManagementFactory;
import java.util.Enumeration;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeType;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
public class BodyPreviewExample {
public static void main(String[] args) throws Exception {
String password = "password";
String user = "user";
String queueName = "TEST_QUEUE";
String brokerUrl = "tcp://localhost:61616";
BrokerService broker = BrokerFactory.createBroker("broker:"+brokerUrl);
broker.start();
broker.waitUntilStarted();
Connection conn = new ActiveMQConnectionFactory(brokerUrl)
.createConnection(user, password);
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue producerQueue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(producerQueue);
// create a dummy message
StringBuilder sb = new StringBuilder(1000);
for (int i = 0; i < 100; i++) {
sb.append(">CAFEBABE<");
}
// create and send a JMSBytesMessage
BytesMessage bytesMsg = session.createBytesMessage();
bytesMsg.writeBytes(sb.toString().getBytes());
producer.send(bytesMsg);
// create and send a JMSTextMessage
TextMessage textMsg = session.createTextMessage();
textMsg.setText(sb.toString());
producer.send(textMsg);
producer.close();
out.printf("%nmessage info via session browser%n");
String format = "%-20s = %s%n";
Queue consumerQueue = session.createQueue(queueName);
QueueBrowser browser = session.createBrowser(consumerQueue);
for (Enumeration p = browser.getEnumeration(); p.hasMoreElements();) {
out.println();
Object next = p.nextElement();
if (next instanceof ActiveMQBytesMessage) {
ActiveMQBytesMessage amq = (ActiveMQBytesMessage) next;
out.printf(format, "JMSMessageID", amq.getJMSMessageID());
out.printf(format, "JMSDestination", amq.getJMSDestination());
out.printf(format, "JMSXMimeType", amq.getJMSXMimeType());
out.printf(format, "BodyLength", amq.getBodyLength());
} else if (next instanceof ActiveMQTextMessage) {
ActiveMQTextMessage amq = (ActiveMQTextMessage) next;
out.printf(format, "JMSMessageID", amq.getJMSMessageID());
out.printf(format, "JMSDestination", amq.getJMSDestination());
out.printf(format, "JMSXMimeType", amq.getJMSXMimeType());
out.printf(format, "text.length", amq.getText().length());
} else {
out.printf("unhandled message type: %s%n", next.getClass());
}
}
session.close();
conn.close();
// access the queue via JMX
out.printf("%nmessage info via JMX browse operation%n");
MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName("org.apache.activemq:type=Broker"
+ ",brokerName=localhost"
+ ",destinationType=Queue"
+ ",destinationName=" + queueName);
QueueViewMBean queue
= MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
name, QueueViewMBean.class, true);
CompositeData[] browse = queue.browse();
for (CompositeData compositeData : browse) {
out.println();
CompositeType compositeType = compositeData.getCompositeType();
out.printf(format, "CompositeType", compositeType.getTypeName());
out.printf(format,"JMSMessageID",compositeData.get("JMSMessageID"));
if (compositeData.containsKey("BodyLength")) {
// body length of the ActiveMQBytesMessage
Long bodyLength = (Long) compositeData.get("BodyLength");
out.printf(format, "BodyLength", bodyLength);
// the content displayed by hawtio
Byte[] bodyPreview = (Byte[]) compositeData.get("BodyPreview");
out.printf(format, "size of BodyPreview", bodyPreview.length);
} else if (compositeData.containsKey("Text")) {
String text = (String) compositeData.get("Text");
out.printf(format, "Text.length()", text.length());
}
}
// uncomment if you want to check with e.g. JConsole
// TimeUnit.MINUTES.sleep(5);
broker.stop();
}
}
example output
message info via session browser
JMSMessageID = ID:hostname-50075-1467979678722-3:1:1:1:1
JMSDestination = queue://TEST_QUEUE
JMSXMimeType = jms/bytes-message
BodyLength = 1000
JMSMessageID = ID:hostname-50075-1467979678722-3:1:1:1:2
JMSDestination = queue://TEST_QUEUE
JMSXMimeType = jms/text-message
text.length = 1000
message info via JMX browse operation
CompositeType = org.apache.activemq.command.ActiveMQBytesMessage
JMSMessageID = ID:hostname-50075-1467979678722-3:1:1:1:1
BodyLength = 1000
size of BodyPreview = 255
CompositeType = org.apache.activemq.command.ActiveMQTextMessage
JMSMessageID = ID:hostname-50075-1467979678722-3:1:1:1:2
Text.length() = 1000