Search code examples
javaspringspring-rabbit

@RabbitListener start binding to fanout exchange in "stopped" mode


I want to bind an anonymous queue to a fanout exchange as soon as the app starts to collect messages, but the actual processing of messages should be done later (after some initialization elsewhere). I tried with:

@RabbitListener(autoStartup="false",
                    bindings = @QueueBinding(value = @Queue,
                    exchange = @Exchange(name="myexchange",
                                         type=ExchangeTypes.FANOUT)))
public void processMessage(String message) {
}

but autoStartup="false" will not bind the (anonymous) queue to the exchange.

In other words what I would need is the anonymous queue binding to the exchange as soon as the app starts, and starting reading messages only at a later time. Is it possible with @RabbitListener?

Update: Tried to declare the queue and exchange, but the queue is not added to rabbit unless I also declare the RabbitListener for it:

@Configuration
public class AmqpConfig {
    @Bean
    RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }
    @Bean
    public FanoutExchange fanout() {
        return new FanoutExchange("myexchange");
    }
    private static class ReceiverConfig {
        @Bean
        public Queue myQueue() {
            return new AnonymousQueue();
        }
        @Bean
        public Binding binding(FanoutExchange fanout, Queue myQueue) {
            return BindingBuilder.bind(myQueue).to(fanout);
        }
}

It doesn't create the queue unless I also add the @RabbitListener:

@Component
public class AmqpReceiver {
    @RabbitListener(queues = "#{myQueue.name}")
    public void receive(String in) throws InterruptedException {
    }
}

Solution

  • Since you are not starting the listener, it doesn't open a connection.

    As long as you have the queue, binding and a RabbitAdmin defined as beans in your application context, all you need to do is to force the connection to be opened (the Admin listens for new connections and performs the declaration).

    Simply call createConnection() on the CachingConnectionFactory.

    EDIT

    @SpringBootApplication
    public class So49401150Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So49401150Application.class, args);
        }
    
        @Bean
        ApplicationRunner runner(ConnectionFactory cf, RabbitTemplate template,
                RabbitListenerEndpointRegistry registry) {
            return args -> {
                cf.createConnection().close(); // Admin does declarations here
                template.convertAndSend("myexchange", "", "foo");
                Thread.sleep(30_000);
                registry.start();
            };
        }
    
        @RabbitListener(autoStartup="false",
                bindings = @QueueBinding(value = @Queue,
                exchange = @Exchange(name="myexchange",
                                     type=ExchangeTypes.FANOUT)))
        public void processMessage(String message) {
            System.out.println(message);
        }
    
    }