Search code examples
spring-bootrabbitmqspring-amqpspring-rabbit

Creating and binding an exclusive and auto delete rabbitmq queue at runtime, with a defined expiry time fails


I have a use case, where I need create new queues at runtime, and also create consumers for those newly created queues. The queues created at runtime should be exclusive and auto-delete with an expiry time. I followed the pattern that is suggested over - here If I declare them to be both exclusive and auto-delete, without any x-expires argument it works. However, if I set it, I see an error message in the console, whenever the application tries to create a new queue at runtime. Looks like the argument name is wrong or may not be what spring internally expects. Just looking on how to set that expiry time. Below are my classes:

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.Header;

import java.util.HashMap;
import java.util.Map;

import static org.springframework.amqp.support.AmqpHeaders.CONSUMER_QUEUE;

@Configuration
@EnableRabbit
@Slf4j
@RequiredArgsConstructor
public class PrintJobListenerConfiguration
{

  @RabbitListener(queues = "${queue.name}")
  public void listen(@Header(CONSUMER_QUEUE) String queue) {
    log.info("Message read from the queue : {}", queue);
  }

  @Bean
  public Queue queue(@Value("${queue.name}") String name) {
    Map arg = new HashMap<>();
    arg.put("x-expires","20000");
    return new Queue(name, true, true, true, arg);
  }

  @Bean
  DirectExchange exchange(@Value("${exchange.name}") String exchange) {
    return new DirectExchange(exchange);
  }

  @Bean
  Binding binding(
      @Value("${routing.key}") String routingkey, Queue queue, DirectExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(routingkey);
  }

  @Bean
  public RabbitAdmin admin(ConnectionFactory cf) {
    return new RabbitAdmin(cf);
  }
}

=========================================================================================

import com.mm.alchemy.dynamicqueue.PrintJobListenerConfiguration;
import com.mm.alchemy.print.properties.ApplicationProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.PropertiesPropertySource;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

@Slf4j
@Service
@RequiredArgsConstructor
public class DynamicQueueListenerService {
  private final Map<String, ConfigurableApplicationContext> children = new HashMap<>();
  private final ApplicationContext context;
  private final ApplicationProperties applicationProperties;

  public void addNewDynamicQueueAndListener(String queue) {
    children.put(queue, addNewListener(queue));
  }

  private ConfigurableApplicationContext addNewListener(String queue) {
    AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
    child.setParent(context);
    ConfigurableEnvironment environment = child.getEnvironment();
    Properties properties = new Properties();
    properties.setProperty("queue.name", queue);
    properties.setProperty(
        "exchange.name", applicationProperties.getConsumer().getPrintJobRequestExchangeName());
    properties.setProperty("routing.key", queue);
    PropertiesPropertySource pps = new PropertiesPropertySource("props", properties);
    environment.getPropertySources().addLast(pps);
    child.register(PrintJobListenerConfiguration.class);
    child.refresh();
    return child;
  }
}


Failure stack trace when trying to create a queue at runtime:

2021-05-05 20:28:32.096  INFO 96868 --- [  restartedMain] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2021-05-05 20:28:32.119 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - invalid arg 'x-expires' for queue 'STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414' in vhost '/': {unacceptable_type,longstr}, class-id=50, method-id=10)
2021-05-05 20:28:32.139  INFO 96868 --- [  restartedMain] o.s.b.d.a.OptionalLiveReloadServer       : LiveReload server is running on port 35729
2021-05-05 20:28:33.143  INFO 96868 --- [  restartedMain] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2021-05-05 20:28:33.150 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - invalid arg 'x-expires' for queue 'STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414' in vhost '/': {unacceptable_type,longstr}, class-id=50, method-id=10)
2021-05-05 20:28:35.157  INFO 96868 --- [  restartedMain] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2021-05-05 20:28:35.165 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - invalid arg 'x-expires' for queue 'STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414' in vhost '/': {unacceptable_type,longstr}, class-id=50, method-id=10)
2021-05-05 20:28:39.171  INFO 96868 --- [  restartedMain] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2021-05-05 20:28:39.178 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - invalid arg 'x-expires' for queue 'STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414' in vhost '/': {unacceptable_type,longstr}, class-id=50, method-id=10)
2021-05-05 20:28:44.182  INFO 96868 --- [  restartedMain] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2021-05-05 20:28:44.189 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - invalid arg 'x-expires' for queue 'STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414' in vhost '/': {unacceptable_type,longstr}, class-id=50, method-id=10)
2021-05-05 20:28:44.193  INFO 96868 --- [  restartedMain] o.s.a.r.l.SimpleMessageListenerContainer : Broker not available; cannot force queue declarations during start: java.io.IOException
2021-05-05 20:28:44.205  WARN 96868 --- [ntContainer#0-1] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue: STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414
2021-05-05 20:28:44.209  WARN 96868 --- [ntContainer#0-1] o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration failed; retries left=3

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:733) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:608) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:595) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1347) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1192) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1012) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:46) ~[amqp-client-5.10.0.jar:5.10.0]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1157) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at com.sun.proxy.$Proxy83.queueDeclarePassive(Unknown Source) ~[na:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:711) ~[spring-rabbit-2.3.6.jar:2.3.6]
    ... 5 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.10.0.jar:5.10.0]
    ... 14 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666) ~[amqp-client-5.10.0.jar:5.10.0]
    ... 1 common frames omitted


Solution

  • arg.put("x-expires","20000");
    

    reply-text=PRECONDITION_FAILED - invalid arg 'x-expires'

    x-expires is an integer argument, not a String.

    I suggest using the QueueBuilder which has type safe methods...

    @Bean
    Queue queue() {
        return QueueBuilder.durable("queue")
                .autoDelete()
                .exclusive()
                .expires(20000)
                .build();
    }