Search code examples
spring-cloud-stream

Spring cloud stream rabbit : Queue creation when rabbit server unavailable


I'm using
spring-cloud-stream : 3.1.4
spring-cloud-stream-binder-rabbit: 3.1.4

I have a consumer configured with the here under properties. My issue is when consumer starts before the rabbitmq server is available i can see that the consumer restarts until the connexion is available. Nevertheless the created binding between DLX and DLQ is not the same.

  • If rabbitmq is available when consumer starts : DLQ is bind to DLX with both routing key 'worker.request.queue.name' and 'worker.request.dlq.name'
  • If rabbitmq is not available when consumer starts after some retries : DLQ is only bind to DLX with the routingKey 'worker.request.dlq.name'.

The issue is that i need both binding. Anyone can help me understand what am i doing wrong ?

Thanks.

# 1. consumer queue configuration to listen for worker requests
# Exchange name
spring.cloud.stream.bindings.listenvalidateprocesssend-in-0.destination=${worker.request.exchange.name}
# Queue name
spring.cloud.stream.bindings.listenvalidateprocesssend-in-0.group=${worker.request.queue.name}
# Force creation of queue if doesn't exists.
spring.cloud.stream.bindings.listenvalidateprocesssend-in-0.consumer.requiredGroups=${worker.request.queue.name}
# Disable retry of error messages
spring.cloud.stream.bindings.listenvalidateprocesssend-in-0.consumer.maxAttempts=1

# 2. producer : to send worker responses to manager
spring.cloud.stream.bindings.listenvalidateprocesssend-out-0.destination=${worker.response.exchange.name}
spring.cloud.stream.bindings.listenvalidateprocesssend-out-0.group=${worker.response.queue.name}
# Do not create associated queue. Queue is created by the manager which subscribe to the exchange
# spring.cloud.stream.bindings.listenvalidateprocesssend-out-0.producer.requiredGroups=${worker.response.queue.name}

###############################
# Rabbit binder configuration #
###############################

# 1. consumer queue configuration to listen for worker requests
# Allows naming created queues with only group property. Default is destination.group.
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.queueNameGroupOnly=true
# enable transaction of consumed messages. NOTE : the index must not be present in the binding name !!!!!
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in.consumer.transacted=true
# Queue and Exchange for request can be created by manager.
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.maxPriority=255
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.deadLetterQueueName=${worker.request.dlq.name}
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.deadLetterExchange=${worker.request.dlx.name}
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.deadLetterRoutingKey=${worker.request.dlq.name}
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.deadLetterExchangeType=topic
spring.cloud.stream.rabbit.bindings.listenvalidateprocesssend-in-0.consumer.dlqMaxPriority=255

EDIT 1 :

After looking into sprint cloud stream rabbit bidder code, i can see the issue at : https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/blob/97c6f79fff15d985434e24c6b48c85caa962a4e6/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java#L659

Each Queue/Exhcnage/Binding declaration is initialized once, an retryed after each attemps. But the list of declaration is a map of bean with a string key. In case of double binding on the same queue, hte key is the same and so only the first declaration is saved. See here : https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/blob/97c6f79fff15d985434e24c6b48c85caa962a4e6/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java#L629


Solution

  • It's a bug; please open an issue here https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues