Rabbit config:
package com.rabbitMQ;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.URI;
import java.net.URISyntaxException;
@EnableRabbit
@Configuration
public class RabbitMqConfig {
private static final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);
@Value("${spring.rabbitmq.addresses}")
private String addressURL;
@Bean
public ConnectionFactory connectionFactory() throws URISyntaxException {
return new CachingConnectionFactory(new URI(addressURL));
}
/**
* Required for executing adminstration functions against an AMQP Broker
*/
@Bean
public AmqpAdmin amqpAdmin() throws URISyntaxException {
return new RabbitAdmin(connectionFactory());
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate rabbitTemplate() throws URISyntaxException {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
}
Overview of application : Whenever an gitRepository is connected to our application, the repository name becomes the exchange name, in this case ForceCI
, then each branchin that repository will create its own queue, here there are two queues develop
and master
. Now everytime a pull request gets created in develop branch I need to pass the information to develop queue and it should be listened by specific listener which should be registered only for develop. I saw examples for dynamic queues but I cannpt seem to find any examples on how to create dynamic listeners which will execute with different threads, how can I achieve this?
Also I am trying to send some messages to queue as test but I am not able to see them in console. (code below)
@RequestMapping(value = "/createExchange", method = RequestMethod.GET)
public void createExchange(ServletResponse response, ServletRequest
request) throws URISyntaxException {
rabbitMqConfig.amqpAdmin().declareExchange(new DirectExchange("ForceCI"));
}
@RequestMapping(value = "/createDynamicQueues", method = RequestMethod.GET)
public void createDynamicQueues(@RequestParam String branchName, ServletResponse response, ServletRequest
request) throws URISyntaxException {
Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties(branchName);
System.out.println("develop -> "+develop);
if(develop != null && develop.stringPropertyNames() != null && !develop.stringPropertyNames().isEmpty()) {
for (String stringPropertyName : develop.stringPropertyNames()) {
String property = develop.getProperty(stringPropertyName);
System.out.println("property Value -> " + property + " ---- " + "property key -> " + stringPropertyName);
}
} else {
Queue queue = new Queue(branchName, true);
String develop1 = rabbitMqConfig.amqpAdmin().declareQueue(new Queue(branchName, true));
rabbitMqConfig.amqpAdmin().declareBinding(BindingBuilder.bind(queue).to(new DirectExchange("ForceCI")).withQueueName());
System.out.println(develop1);
}
}
@RequestMapping(value = "/sendMessageToQueuesDevelop", method = RequestMethod.GET)
public void sendMessageToQueuesDevelop(ServletResponse response, ServletRequest
request) throws URISyntaxException {
Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties("develop");
String queue_name = develop.getProperty("QUEUE_NAME");
rabbitTemplate.convertAndSend("ForceCI", queue_name, "TestMessage");
}
@RequestMapping(value = "/sendMessageToQueuesMaster", method = RequestMethod.GET)
public void sendMessageToQueuesMaster(ServletResponse response, ServletRequest
request) throws URISyntaxException {
Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties("master");
String queue_name = develop.getProperty("QUEUE_NAME");
rabbitTemplate.convertAndSend("ForceCI", queue_name, "TestMessage1");
}
UPDATE
Binding was missing, when I gave binding as shown above in code, the messages started going in, but I still cant figure out how to listen these messages in different listeners and process them in different threads?
The simplest way is to use a DirectMessageListenerContainer
and add queues to it as necessary. You won't get a new thread for each queue, though; with the direct container the listener is invoked on a thread from the amqp-client
thread pool.
The direct container is efficient at adding queues; you can start with zero queues if needed. See Choosing a container for more information.
If you MUST have a new thread for each queue, you will have to manually create (and manage) a SimpleMessageListenerContainer
for each.