Search code examples
spring-bootrabbitmqspring-rabbit

How to configure only one consumer per queue in SpringBoot:RabbitMQ?


Application overview: There can be multiple dynamic queues created. Each queue should only have one consumer. Only when the consumer has finished processing one message, only then the consumer should pick up the another message.

Here is my configuration:

@EnableRabbit
@Configuration
public class RabbitMqSenderConfig {
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqSenderConfig.class);

    @Value("${spring.rabbitmq.addresses}")
    private String addressURL;


    @Bean
    public ConnectionFactory connectionFactory() throws URISyntaxException {
        return new CachingConnectionFactory(new URI(addressURL));
    }

    /**
     * Required for executing adminstration functions against an AMQP Broker
     */
    @Bean
    public AmqpAdmin amqpAdmin() throws URISyntaxException {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate rabbitTemplate() throws URISyntaxException {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }


}

Listener:

public class RabbitMqConsumer extends SimpleMessageListenerContainer {

    public void startConsumers() throws Exception {
        super.doStart();
    }
}

Handle message method:

public void handleMessage(DeploymentJob deploymentJob) {
    // Deployment running, takes almost 10-15 minutes each
    try {
        System.out.println("deploymentJob.getSocketHandler().getSessions() -> "+deploymentJob.getSocketHandler().getSessions());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

Everytime a pull request is created, I send the details to rabbitMQ in a queue named after target branch, so that all the pull requests gets validated in queue, and I create a consumer like this.

    rabbitTemplate.convertAndSend(repoName, queue_name, deploymentJob);
    RabbitMqConsumer container = new RabbitMqConsumer();
    container.setConnectionFactory(rabbitMqSenderConfig.connectionFactory());
    container.setQueueNames(queue_name);
    container.setConcurrentConsumers(1);
    container.setMessageListener(new MessageListenerAdapter(new ConsumerHandler(), new Jackson2JsonMessageConverter()));
    container.startConsumers();

The issue here is it is creating new consumers for each and every message in the queue, which I want to avoid. And I am not finding a way to limit only one consumer per queue. or to check if the consumer does not exists then don't create a new instance of RabbitMqConsumer. Is this possible?

Consumer section

enter image description here


Solution

  • Just use a ConcurrentHashMap keyed by the queue name to determine which queues you already have a consumer for; when you create a new consumer, add it to the map.