Search code examples
javaspringspring-amqpspring-rabbit

Spring AMQP Set prefetchCount on Queue Bean


How do I set the prefetchCount on for this queue consumer?

@Bean
public Queue eventQueue(AmqpAdmin amqpAdmin) {
    Queue queue = QueueBuilder.durable(EVENT_QUEUE_NAME)
            ...
            .build();

    TopicExchange topicExchange = new TopicExchange(TOPIC_EXCHANGE, true, false);

    amqpAdmin.declareBinding(BindingBuilder
            .bind(queue)
            .to(topicExchange)
            .with(EVENT_ROUTING_KEY));

    return queue;
}

The documentation notes that prefetchCount this is a container configuration, but setting it on my factory bean did not work and the value defaulted to 250:

    @Bean
    public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPrefetchCount(10); // doesn't work; defaults to 250
        return factory;
    }

UPDATE

Per @GaryRussell's comment below, I did test out the default rabbitListenerContainerFactory and also validated my Spring Boot configuration spring.rabbitmq.listener.simple.prefetch was being consumed in AbstractRabbitListenerContainerFactoryConfigurer. However, when I look at the queue consumers in RabbitMQ, I can see that the queues that I define using the default container setup still have a prefetchCount of 250:

rabbit admin shows consumer prefetch 250

I'm using the RabbitMQ admin panel as the source of truth. I don't think it's lying, because I have a bunch of dynamic queues that are instantiated with custom containers, and those do have non-default (correct) prefetchCounts. I also validated in the Spring container startup that there was only the one (expected) rabbitListenerContainerFactory bean.


Solution

  • Prefetch is not a queue property, it's a consumer property.

    What does your listener look like?

    You are using a non-standard name for the container factory.

    You either need to add containerFactory property to the @RabbitListener or you need to rename your bean to rabbitListenerContainerFactory (overriding Boot's defined factory @Bean).

    Also

    amqpAdmin.declareBinding(BindingBuilder
    

    You should not be talking to the broker in a bean definition - it's too early.

    Simply add your queue, exchange, and binding as @Beans and You can also just set the prefetch in the application.properties/yaml file (if you are using Spring Boot). The admin will find them and declare them when the connection is first opened.

    EDIT

    There's someething else going on...

    @SpringBootApplication
    public class So62049769Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So62049769Application.class, args);
        }
    
        @Bean
        public Queue queue() {
            return new Queue("so62049769");
        }
    
        @RabbitListener(queues = "so62049769")
        public void listen(String in) {
            System.out.println(in);
        }
    
    }
    
    spring.rabbitmq.listener.simple.prefetch=42
    

    enter image description here