Search code examples
javarabbitmqamqpspring-amqpspring-rabbit

How to listen to a dynamically created queue?


I have a rabbitListener which continuously listens to user messages of a queue "user-messages" asynchronously. Everything is OK until unless queue is loaded with bulk messages. When messages in bulk published to queue, messages of the same user are processing first thereby messages of other users are waiting for their turn.

I can't use Priority Queue because all the users have equal priority. So I want to create new queues and listen to them at runtime. All the queues will be short-lived as soon as messages consumed. (the queue will be deleted)

On browsing, I found a queue can be dynamically created using RabbitAdmin. But the issues are

  1. How can I make my listener listen to a new short-live (TTL) queue created at runtime?
  2. How can I make the listener stop listening to a deleted queue (after TTL time) to avoid exceptions?

Currently, I'm using SimpleMessageListenerContainerFactory. I've no issues to use DirectMessageListenerContainer as well. My only concern is how to communicate about dynamic queue creation & deletion to Listener. Thinking about to https://www.rabbitmq.com/event-exchange.html (event exchange plugin).

Is there any way that spring-amqp supporting start/stop listening dynamic queues. Thanks in advance.

    @Bean
    public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(config.getConnectionFactory());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(3);
        return factory;
    }

    @RabbitListener(id = "listener", queues = {
            "#{receiver.queues()}" }, containerFactory = "myRabbitListenerContainerFactory")
    public void listen(QueueMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
            MessageHeaders headers) {
         //process message
    }


  [1]: https://www.rabbitmq.com/event-exchange.html

Solution

  • this geezer seems to be doing exactly that => https://karadenizfaruk28.medium.com/rabbitmq-dynamic-queue-add-and-listen-at-runtime-with-springboot-c7d42f0447c

    code from the link:

    • rabbitMQ config
    @Configuration
    public class RabbitMqConfiguration implements RabbitListenerConfigurer {
        @Autowired
        private ConnectionFactory connectionFactory;
        @Bean
        public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
        @Bean
        public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
            return new MappingJackson2MessageConverter();
        }
        @Bean
        public RabbitTemplate rabbitTemplate() {
            final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
            return rabbitTemplate;
        }
        @Bean
        public RabbitAdmin rabbitAdmin() {
            return new RabbitAdmin(connectionFactory);
        }
        @Bean
        public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
            return new RabbitListenerEndpointRegistry();
        }
        @Bean
        public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
            DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
            factory.setMessageConverter(consumerJackson2MessageConverter());
            return factory;
        }
        @Bean
        public MessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
        @Override
        public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setPrefetchCount(1);
            factory.setConsecutiveActiveTrigger(1);
            factory.setConsecutiveIdleTrigger(1);
            factory.setConnectionFactory(connectionFactory);
            registrar.setContainerFactory(factory);
            registrar.setEndpointRegistry(rabbitListenerEndpointRegistry());
            registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
        }
    }
    
    • interface
    public interface RabbitQueueService {
        void addNewQueue(String queueName,String exchangeName,String routingKey);
        void addQueueToListener(String listenerId,String queueName);
        void removeQueueFromListener(String listenerId,String queueName);
        Boolean checkQueueExistOnListener(String listenerId,String queueName);
    }
    
    • service
    @Service
    @Log4j2
    public class RabbitQueueServiceImpl implements RabbitQueueService {
        @Autowired
        private RabbitAdmin rabbitAdmin;
        @Autowired
        private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
    
        @Override
        public void addNewQueue(String queueName, String exchangeName, String routingKey) {
            Queue queue = new Queue(queueName, true, false, false);
            Binding binding = new Binding(
                    queueName,
                    Binding.DestinationType.QUEUE,
                    exchangeName,
                    routingKey,
                    null
            );
            rabbitAdmin.declareQueue(queue);
            rabbitAdmin.declareBinding(binding);
            this.addQueueToListener(exchangeName,queueName);
        }
    
        @Override
        public void addQueueToListener(String listenerId, String queueName) {
            log.info("adding queue : " + queueName + " to listener with id : " + listenerId);
            if (!checkQueueExistOnListener(listenerId,queueName)) {
                this.getMessageListenerContainerById(listenerId).addQueueNames(queueName);
                log.info("queue ");
            } else {
                log.info("given queue name : " + queueName + " not exist on given listener id : " + listenerId);
            }
        }
    
        @Override
        public void removeQueueFromListener(String listenerId, String queueName) {
            log.info("removing queue : " + queueName + " from listener : " + listenerId);
            if (checkQueueExistOnListener(listenerId,queueName)) {
                this.getMessageListenerContainerById(listenerId).removeQueueNames(queueName);
                log.info("deleting queue from rabbit management");
                this.rabbitAdmin.deleteQueue(queueName);
            } else {
                log.info("given queue name : " + queueName + " not exist on given listener id : " + listenerId);
            }
        }
    
        @Override
        public Boolean checkQueueExistOnListener(String listenerId, String queueName) {
            try {
                log.info("checking queueName : " + queueName + " exist on listener id : " + listenerId);
                log.info("getting queueNames");
                String[] queueNames = this.getMessageListenerContainerById(listenerId).getQueueNames();
                log.info("queueNames : " + new Gson().toJson(queueNames));
                if (queueNames != null) {
                    log.info("checking " + queueName + " exist on active queues");
                    for (String name : queueNames) {
                        log.info("name : " + name + " with checking name : " + queueName);
                        if (name.equals(queueName)) {
                            log.info("queue name exist on listener, returning true");
                            return Boolean.TRUE;
                        }
                    }
                    return Boolean.FALSE;
                } else {
                    log.info("there is no queue exist on listener");
                    return Boolean.FALSE;
                }
            } catch (Exception e) {
                log.error("Error on checking queue exist on listener");
                log.error("error message : " + ExceptionUtils.getMessage(e));
                log.error("trace : " + ExceptionUtils.getStackTrace(e));
                return Boolean.FALSE;
            }
        }
    
        private AbstractMessageListenerContainer getMessageListenerContainerById(String listenerId) {
            log.info("getting message listener container by id : " + listenerId);
            return ((AbstractMessageListenerContainer) this.rabbitListenerEndpointRegistry
                    .getListenerContainer(listenerId)
            );
        }
    }