Search code examples
rabbitmqspring-cloudspring-rabbit

Spring Cloud Stream RabbitMQ add Queue argument


I would like to know how to add extra arguments to a rabbitmq queue declared with spring cloud stream.

I want to use the Single Active Consumer feature for RabbitMQ 3.8.x. To do that I have to add an extra argument to the queue declaration x-single-active-consumer.
There is no way to configure it directly with spring properties.


Solution

  • Setting arbitrary queue arguments is not currently supported by spring cloud stream.

    Please open a GitHub issue against the binder to request a new feature.

    However, you can simply declare the queue by adding a Queue @Bean to the application with the argument set.

    Or, you can simply set the exclusive consumer binder properties, which provides similar semantics; the competing consumers will periodically attempt to reconnect.

    EDIT

    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class So59011707Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So59011707Application.class, args);
        }
    
        @StreamListener(Sink.INPUT)
        public void listen(String in) {
            System.out.println(in);
        }
    
        @Bean
        Queue queue() {
            return QueueBuilder.durable("so59011707.myGroup")
                    .withArgument("x-single-active-consumer", true)
                    .build();
        }
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate template) {
            return args -> {
                template.convertAndSend("so59011707", "", "foo");
            };
        }
    
    }
    

    and

    spring.cloud.stream.bindings.input.destination=so59011707
    spring.cloud.stream.bindings.input.group=myGroup
    

    You will see an error message in the log

    2019-11-24 10:24:22.310 ERROR 83004 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-single-active-consumer' for queue 'so59011707.myGroup' in vhost '/': received none but current is the value 'true' of type 'bool', class-id=50, method-id=10)

    which you can ignore. Or you can avoid it by setting bindQueue to false and add Exchange and Binding @Beans as well...

    spring.cloud.stream.rabbit.bindings.input.consumer.bind-queue=false
    
    @Bean
    Binding binding() {
        return BindingBuilder.bind(queue())
                .to(exchange())
                .with("#");
    }
    
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("so59011707");
    }