I have a rabbitListener which continuously listens to user messages of a queue "user-messages" asynchronously. Everything is OK until unless queue is loaded with bulk messages. When messages in bulk published to queue, messages of the same user are processing first thereby messages of other users are waiting for their turn.
I can't use Priority Queue because all the users have equal priority. So I want to create new queues and listen to them at runtime. All the queues will be short-lived as soon as messages consumed. (the queue will be deleted)
On browsing, I found a queue can be dynamically created using RabbitAdmin. But the issues are
Currently, I'm using SimpleMessageListenerContainerFactory. I've no issues to use DirectMessageListenerContainer as well. My only concern is how to communicate about dynamic queue creation & deletion to Listener. Thinking about to https://www.rabbitmq.com/event-exchange.html (event exchange plugin).
Is there any way that spring-amqp supporting start/stop listening dynamic queues. Thanks in advance.
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(config.getConnectionFactory());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(3);
return factory;
}
@RabbitListener(id = "listener", queues = {
"#{receiver.queues()}" }, containerFactory = "myRabbitListenerContainerFactory")
public void listen(QueueMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
MessageHeaders headers) {
//process message
}
[1]: https://www.rabbitmq.com/event-exchange.html
this geezer seems to be doing exactly that => https://karadenizfaruk28.medium.com/rabbitmq-dynamic-queue-add-and-listen-at-runtime-with-springboot-c7d42f0447c
code from the link:
@Configuration
public class RabbitMqConfiguration implements RabbitListenerConfigurer {
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
return rabbitTemplate;
}
@Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(consumerJackson2MessageConverter());
return factory;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Override
public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setPrefetchCount(1);
factory.setConsecutiveActiveTrigger(1);
factory.setConsecutiveIdleTrigger(1);
factory.setConnectionFactory(connectionFactory);
registrar.setContainerFactory(factory);
registrar.setEndpointRegistry(rabbitListenerEndpointRegistry());
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
}
public interface RabbitQueueService {
void addNewQueue(String queueName,String exchangeName,String routingKey);
void addQueueToListener(String listenerId,String queueName);
void removeQueueFromListener(String listenerId,String queueName);
Boolean checkQueueExistOnListener(String listenerId,String queueName);
}
@Service
@Log4j2
public class RabbitQueueServiceImpl implements RabbitQueueService {
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
@Override
public void addNewQueue(String queueName, String exchangeName, String routingKey) {
Queue queue = new Queue(queueName, true, false, false);
Binding binding = new Binding(
queueName,
Binding.DestinationType.QUEUE,
exchangeName,
routingKey,
null
);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(binding);
this.addQueueToListener(exchangeName,queueName);
}
@Override
public void addQueueToListener(String listenerId, String queueName) {
log.info("adding queue : " + queueName + " to listener with id : " + listenerId);
if (!checkQueueExistOnListener(listenerId,queueName)) {
this.getMessageListenerContainerById(listenerId).addQueueNames(queueName);
log.info("queue ");
} else {
log.info("given queue name : " + queueName + " not exist on given listener id : " + listenerId);
}
}
@Override
public void removeQueueFromListener(String listenerId, String queueName) {
log.info("removing queue : " + queueName + " from listener : " + listenerId);
if (checkQueueExistOnListener(listenerId,queueName)) {
this.getMessageListenerContainerById(listenerId).removeQueueNames(queueName);
log.info("deleting queue from rabbit management");
this.rabbitAdmin.deleteQueue(queueName);
} else {
log.info("given queue name : " + queueName + " not exist on given listener id : " + listenerId);
}
}
@Override
public Boolean checkQueueExistOnListener(String listenerId, String queueName) {
try {
log.info("checking queueName : " + queueName + " exist on listener id : " + listenerId);
log.info("getting queueNames");
String[] queueNames = this.getMessageListenerContainerById(listenerId).getQueueNames();
log.info("queueNames : " + new Gson().toJson(queueNames));
if (queueNames != null) {
log.info("checking " + queueName + " exist on active queues");
for (String name : queueNames) {
log.info("name : " + name + " with checking name : " + queueName);
if (name.equals(queueName)) {
log.info("queue name exist on listener, returning true");
return Boolean.TRUE;
}
}
return Boolean.FALSE;
} else {
log.info("there is no queue exist on listener");
return Boolean.FALSE;
}
} catch (Exception e) {
log.error("Error on checking queue exist on listener");
log.error("error message : " + ExceptionUtils.getMessage(e));
log.error("trace : " + ExceptionUtils.getStackTrace(e));
return Boolean.FALSE;
}
}
private AbstractMessageListenerContainer getMessageListenerContainerById(String listenerId) {
log.info("getting message listener container by id : " + listenerId);
return ((AbstractMessageListenerContainer) this.rabbitListenerEndpointRegistry
.getListenerContainer(listenerId)
);
}
}