Application overview: There can be multiple dynamic queues created. Each queue should only have one consumer. Only when the consumer has finished processing one message, only then the consumer should pick up the another message.
Here is my configuration:
@EnableRabbit
@Configuration
public class RabbitMqSenderConfig {
private static final Logger logger = LoggerFactory.getLogger(RabbitMqSenderConfig.class);
@Value("${spring.rabbitmq.addresses}")
private String addressURL;
@Bean
public ConnectionFactory connectionFactory() throws URISyntaxException {
return new CachingConnectionFactory(new URI(addressURL));
}
/**
* Required for executing adminstration functions against an AMQP Broker
*/
@Bean
public AmqpAdmin amqpAdmin() throws URISyntaxException {
return new RabbitAdmin(connectionFactory());
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate rabbitTemplate() throws URISyntaxException {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
}
Listener:
public class RabbitMqConsumer extends SimpleMessageListenerContainer {
public void startConsumers() throws Exception {
super.doStart();
}
}
Handle message method:
public void handleMessage(DeploymentJob deploymentJob) {
// Deployment running, takes almost 10-15 minutes each
try {
System.out.println("deploymentJob.getSocketHandler().getSessions() -> "+deploymentJob.getSocketHandler().getSessions());
} catch (Exception e) {
e.printStackTrace();
}
}
Everytime a pull request is created, I send the details to rabbitMQ in a queue named after target
branch, so that all the pull requests gets validated in queue, and I create a consumer like this.
rabbitTemplate.convertAndSend(repoName, queue_name, deploymentJob);
RabbitMqConsumer container = new RabbitMqConsumer();
container.setConnectionFactory(rabbitMqSenderConfig.connectionFactory());
container.setQueueNames(queue_name);
container.setConcurrentConsumers(1);
container.setMessageListener(new MessageListenerAdapter(new ConsumerHandler(), new Jackson2JsonMessageConverter()));
container.startConsumers();
The issue here is it is creating new consumers for each and every message in the queue, which I want to avoid.
And I am not finding a way to limit only one consumer per queue. or to check if the consumer does not exists then don't create a new instance of RabbitMqConsumer
. Is this possible?
Consumer section
Just use a ConcurrentHashMap
keyed by the queue name to determine which queues you already have a consumer for; when you create a new consumer, add it to the map.