Search code examples
javarabbitmqamqpspring-rabbit

Dynamic queues and listeners, messages not being sent?


Rabbit config:

package com.rabbitMQ;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.URI;
import java.net.URISyntaxException;

@EnableRabbit
@Configuration
public class RabbitMqConfig {
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);

    @Value("${spring.rabbitmq.addresses}")
    private String addressURL;


    @Bean
    public ConnectionFactory connectionFactory() throws URISyntaxException {
        return new CachingConnectionFactory(new URI(addressURL));
    }

    /**
     * Required for executing adminstration functions against an AMQP Broker
     */
    @Bean
    public AmqpAdmin amqpAdmin() throws URISyntaxException {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate rabbitTemplate() throws URISyntaxException {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }


}

Overview of application : Whenever an gitRepository is connected to our application, the repository name becomes the exchange name, in this case ForceCI, then each branchin that repository will create its own queue, here there are two queues develop and master . Now everytime a pull request gets created in develop branch I need to pass the information to develop queue and it should be listened by specific listener which should be registered only for develop. I saw examples for dynamic queues but I cannpt seem to find any examples on how to create dynamic listeners which will execute with different threads, how can I achieve this? Also I am trying to send some messages to queue as test but I am not able to see them in console. (code below)

enter image description here

@RequestMapping(value = "/createExchange", method = RequestMethod.GET)
public void createExchange(ServletResponse response, ServletRequest
        request) throws URISyntaxException {


    rabbitMqConfig.amqpAdmin().declareExchange(new DirectExchange("ForceCI"));

}

@RequestMapping(value = "/createDynamicQueues", method = RequestMethod.GET)
public void createDynamicQueues(@RequestParam String branchName, ServletResponse response, ServletRequest
        request) throws URISyntaxException {
    Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties(branchName);

    System.out.println("develop -> "+develop);
    if(develop != null && develop.stringPropertyNames() != null && !develop.stringPropertyNames().isEmpty()) {
        for (String stringPropertyName : develop.stringPropertyNames()) {
            String property = develop.getProperty(stringPropertyName);
            System.out.println("property Value -> " + property + " ---- " + "property key -> " + stringPropertyName);
        }
    } else {
                    Queue queue = new Queue(branchName, true);
        String develop1 = rabbitMqConfig.amqpAdmin().declareQueue(new Queue(branchName, true));
        rabbitMqConfig.amqpAdmin().declareBinding(BindingBuilder.bind(queue).to(new DirectExchange("ForceCI")).withQueueName());
        System.out.println(develop1);
    }
}

@RequestMapping(value = "/sendMessageToQueuesDevelop", method = RequestMethod.GET)
public void sendMessageToQueuesDevelop(ServletResponse response, ServletRequest
        request) throws URISyntaxException {


    Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties("develop");
    String queue_name = develop.getProperty("QUEUE_NAME");

    rabbitTemplate.convertAndSend("ForceCI", queue_name, "TestMessage");


}

@RequestMapping(value = "/sendMessageToQueuesMaster", method = RequestMethod.GET)
public void sendMessageToQueuesMaster(ServletResponse response, ServletRequest
        request) throws URISyntaxException {


    Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties("master");
    String queue_name = develop.getProperty("QUEUE_NAME");

    rabbitTemplate.convertAndSend("ForceCI", queue_name, "TestMessage1");


}

UPDATE

Binding was missing, when I gave binding as shown above in code, the messages started going in, but I still cant figure out how to listen these messages in different listeners and process them in different threads?


Solution

  • The simplest way is to use a DirectMessageListenerContainer and add queues to it as necessary. You won't get a new thread for each queue, though; with the direct container the listener is invoked on a thread from the amqp-client thread pool.

    The direct container is efficient at adding queues; you can start with zero queues if needed. See Choosing a container for more information.

    If you MUST have a new thread for each queue, you will have to manually create (and manage) a SimpleMessageListenerContainer for each.