Search code examples
pythonstompactivemq-artemisstomp.py

Receiving "Error reading in simpleString, length=[] is greater than readableBytes=[]" sending message to ActiveMQ using Stomp.py


I'm trying to send and receive messages using Apache ActiveMQ Artemis 2.24, using Python to send messages using STOMP, and receiving messages in Java. The Stomp.py version is 8.0.1. Python version is 3.10.4. Java version is 1.8.0_342.

If I run my Java based message consumer and send a message using Java code, everything works fine. But if I send a message to the queue using Stomp.py I get the following exception on the receive side:

Exception in thread "main" java.lang.IndexOutOfBoundsException: Error reading in simpleString, length=1864388729 is greater than readableBytes=10
    at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:183)
    at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:171)
    at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readStringInternal(ChannelBufferWrapper.java:103)
    at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readString(ChannelBufferWrapper.java:88)
    at org.fogbeam.experimental.bosworth.activemq.ActiveMQConsumerMain.main(ActiveMQConsumerMain.java:54)

The message consumer looks like this:

import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;

public class ActiveMQConsumerMain 
{
    public static void main(String[] args) throws Exception
    {
        ServerLocator locator = ActiveMQClient.createServerLocator("tcp://172.16.1.141:61616");

        ClientSessionFactory factory =  locator.createSessionFactory();
        ClientSession session = factory.createSession( "username", "password", false, true, true, false, 4096 );

        // We need a queue attached to the address ...

        try
        {
            session.createQueue("example", RoutingType.ANYCAST, "example", true);
        }
        catch( ActiveMQQueueExistsException amqe )
        {
            if( amqe.getMessage().contains("already exists" ))
            {
                // no problem, our queue already exists
                System.out.println( "Queue already exists on server!" );
            }
            else
            {
                amqe.printStackTrace();
            }
        }
        
        // And a consumer attached to the queue ...

        ClientConsumer consumer =  session.createConsumer("example");

        session.start();
        
        while( true )
        {
            System.out.println( "Listening..." );
            ClientMessage msgReceived = consumer.receive();

            System.out.println("message = " + msgReceived.getBodyBuffer().readString());
            
            msgReceived.acknowledge();
        
            session.commit();
        }
    }
}

The Java message producer that works looks like this:

import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;

public class ActiveMQProducerMain 
{
    public static void main(String[] args) throws Exception
    {
        ServerLocator locator = ActiveMQClient.createServerLocator("tcp://172.16.1.141:61616");

        ClientSessionFactory factory =  locator.createSessionFactory();
        ClientSession session = factory.createSession( "username", "password", false, true, true, false, 4096 );
        
        ClientProducer producer = session.createProducer("example");
        ClientMessage message = session.createMessage(true);
        message.getBodyBuffer().writeString("Hello world!!!");

        // We need a queue attached to the address ...

        try
        {
            session.createQueue("example", RoutingType.ANYCAST, "example", true);
        }
        catch( ActiveMQQueueExistsException amqe )
        {
            if( amqe.getMessage().contains("already exists" ))
            {
                // no problem, out queue already exists
                System.out.println( "Queue already exists on server!" );
            }
            else
            {
                amqe.printStackTrace();
            }
        }

        // Once we have a queue, we can send the message ...

        producer.send(message); 
    }

}

And the Python message producer code is as follows:

import time
import stomp

def main():
    print( "Sending ActiveMQ message using STOMP client!\n" )
    
    conn = stomp.Connection( [('172.16.1.141', 61613)] )
    
    conn.connect( wait=True, headers={'consumerWindowSize': 0})
    
    conn.send(body='Hello Python World', destination='example')
    time.sleep(5)
    conn.disconnect()
    
    exit()
    
if __name__ == "__main__":
    main()
    

EDIT 1:

Tried something else - I set up a Stomp.py message consumer and when I run that, and send the message with Stomp.py, everything works fine. So it seems that both the Java and Python client libraries fundamentally "work" but something about the intersection of the two (eg, sending from Python, receiving in Java) is breaking.

EDIT 2:

Also, if I run the Python based message consumer, and send using the Java message producer, the Python code receives a message, but the content of the message appears to be empty. So, again, it looks like there is some weird mismatch between what's happening in "Java land" and what's happening in "Python land."

Any thoughts on what could be causing this?

EDIT 3:

I tried switching to readNullableSimpleString() per the answer from Justin below, so now my code looks like this:

System.out.println("message = " + msgReceived.getBodyBuffer().readNullableSimpleString() );

and now I get this:

java.lang.IndexOutOfBoundsException: Error reading in simpleString, length=1701604463 is greater than readableBytes=13
    at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:183)
    at org.apache.activemq.artemis.api.core.SimpleString.readSimpleString(SimpleString.java:171)
    at org.apache.activemq.artemis.api.core.SimpleString.readNullableSimpleString(SimpleString.java:158)
    at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readNullableSimpleString(ChannelBufferWrapper.java:69)
    at org.fogbeam.experimental.bosworth.activemq.ActiveMQConsumerMain.main(ActiveMQConsumerMain.java:64)

Looks like a call to readNullableSimpleString still ultimately triggers a call to readSimpleString() which errors out.

EDIT 4:

I still don't understand the "why" of exactly what's happening under the hood, but I did find a way to get my message successfully in the consumer. This mechanism works:

                int bodySize = msgReceived.getBodySize();
                byte[] bytes = new byte[bodySize];
                msgReceived.getBodyBuffer().readBytes( bytes );
                System.out.println("message = " + new String( bytes ) );

Looks like the STOMP message is being turned into a BytesMessage and not a TextMessage under the hood? Or something close to that. And some older discussion I found suggests that this does, in turn, relate to the presence or absence of the content-length header. Weird. But for now I'm happy I at least a path forward, even if it's not the perfect path forward.


Solution

  • When the broker receives a STOMP message with no content-length header then it encodes the body as a nullable SimpleString using the writeNullableSimpleString method on org.apache.activemq.artemis.api.core.ActiveMQBuffer. Therefore, when you receive that message as a core message you need to read that data as a nullable SimpleString using the readNullableSimpleString method on org.apache.activemq.artemis.api.core.ActiveMQBuffer. This is outlined in the documentation.

    The reason this is working with your core producer and core consumer is they are using writeString and readString respectively.

    Likewise, it doesn't work with your core producer and Python consumer because the core producer is using writeString and the message is being converted into a STOMP MESSAGE frame using readNullableSimpleString which, of course, doesn't work.

    It is important to note that how you get the ActiveMQBuffer from the ClientMessage matters due to the way we track the index on the buffer internally. For example, the JavaDoc on ClientMessage#getBodyBuffer states:

    The buffer to write the body.

    Warning: If you just want to read the content of a message, use getDataBuffer() or getReadOnlyBuffer();

    Therefore, when you use readNullableSimpleString you'll want to do so by using either msgReceived.getDataBuffer().readNullableSimpleString() or msgReceived.getReadOnlyBuffer().readNullableSimpleString()

    Lastly, keep in mind that the body of a message is ultimately just an array of bytes. The broker has no way of knowing what kind of data is in the array. It could be human readable text or it could be binary data, and even if it was text it could be encoded any number of ways. In order for clients to exchange messages with each other they must use a common format. In this case, Java clients using the core API must use a nullable SimpleString.