Search code examples
spring-integrationactivemq-classicspring-integration-dsl

ActiveMQ: Dispatched queue contains more messages then prefetch size


I have prefetch size set to 1 (jms.prefetchPolicy.all=1 in url). In web console I can see that prefetch is 1 for all of my consumers. One consumer got stuck and there were 67 messages on his dispatch queue -see my screenshot

Could you help me understand how could it happen? I've read plenty of articles on this and my understanding is that Dispatch queue size should be up to prefetch size?!

I use following configuration to consume messages from queue:

ConnectionFactory getActiveMQConnectionFactory() {
    // Configure the ActiveMQConnectionFactory
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setBrokerURL(brokerUrl);
    activeMQConnectionFactory.setUserName(user);
    activeMQConnectionFactory.setPassword(password);
    activeMQConnectionFactory.setNonBlockingRedelivery(true);

    // Configure the redeliver policy and the dead letter queue
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(initialRedeliveryDelay);
    redeliveryPolicy.setRedeliveryDelay(redeliveryDelay);
    redeliveryPolicy.setUseExponentialBackOff(useExponentialBackOff);
    redeliveryPolicy.setMaximumRedeliveries(maximumRedeliveries);
    RedeliveryPolicyMap redeliveryPolicyMap = activeMQConnectionFactory.getRedeliveryPolicyMap();
    redeliveryPolicyMap.put(new ActiveMQQueue(thumbnailQueue), redeliveryPolicy);
    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    return activeMQConnectionFactory;
}

public IntegrationFlow createThumbnailFlow(String concurrency, CreateThumbnailReceiver receiver) {
    return IntegrationFlows.from(
            Jms.messageDrivenChannelAdapter(
                    Jms.container(getActiveMQConnectionFactory(), thumbnailQueue)
                            .concurrency(concurrency)
                            .sessionTransacted(true)
                            .get()
            ))
            .transform(new JsonToObjectTransformer(CreateThumbnailRequest.class, jsonObjectMapper()))
            .handle(receiver)
            .get();
}

Solution

  • The problem was cause by difference between version of broker (5.14.5) and client (5.15.3). After upgrading broker dispatched queue contains at most 2 message as expected.