Search code examples
javajmsamqpazureservicebusqpid

Apache QPID JMS Client - setting prefetch doesn't work


I have a JmsConnectionFactory configured with a URI such as this:

failover:(amqps://11.22.33.44?amqp.idleTimeout=120000&transport.tcpKeepAlive=true&jms.prefetchPolicy.all=10)?failover.maxReconnectAttempts=20

Note the jms.prefetchPolicy.all=10 parameter, which according to the official documentation

... controls how many messages the remote peer can send to the client and be held in a prefetch buffer for each consumer instance.

So I shouldn't see more than 10 messages buffered in the client, right? Well, that doesn't work.

I've ended up using reflection to periodically print the MessageQueue.size() of each JmsMessageConsumer:

MessageConsumer messageConsumer = ...
Field field = JmsMessageConsumer.class.getDeclaredField("messageQueue");
field.setAccessible(true);
MessageQueue q = (MessageQueue) field.get(messageConsumer);
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> System.out.println(q.size()), 10, 10, TimeUnit.SECONDS);

And when my message handler is slow (or blocked), I see queue sizes of slightly less than 1000 messages, which is the default prefetch size.

So - is this a bug? And how do I go about setting a different prefetch size?

I'm using qpid-jms-client, version 0.27.0.


Solution

  • Your URI is incorrect, it should be:

    failover://(amqps://11.22.33.44?amqp.idleTimeout=120000&transport.tcpKeepAlive=true)
        ?jms.prefetchPolicy.all=10&failover.maxReconnectAttempts=20
    

    The JMS options are global so they are applied to the outermost part of the URI along with the failover options. The wrapped AMQP connection URIs contain only transport options that control each specific connection.