Search code examples
javaspringrabbitmqspring-cloud-stream

Spring cloud stream RabbitMQ - bind DLQ with an exchange using one routing key


I am using Spring Cloud Stream version 3.0.6.RELEASE.

I have an existing exchange called my.queue.exchange. My application contains one consumer, I want to create a queue called MY_QUEUE and bind that queue to my.queue.exchange exchange. Additionally, I want to republish failed messages to a DLQ called MY_QUEUE_DLQ.

My problem is that the dead letter queue called MY_QUEUE_DLQ is bound to my.queue.exchange.dlx exchange with two routing keys instead of one, the first one with routing key my.queue.rkey.dlx and the second one with routing key MY_QUEUE

My consumer bean:

@Bean
public Consumer<Dto> consumeFunction() {

    return dto -> {
       // do stuff
    };
}

my application.yml:

spring:
  cloud:
    stream:
      function:
        definition: consumeFunction
      rabbit:
        bindings:
          consumeFunction-in-0:
            consumer:
              autoBindDlq: true
              deadLetterQueueName: MY_QUEUE_DLQ
              deadLetterExchange: my.queue.exchange.dlx
              deadLetterRoutingKey: my.queue.rkey.dlx
              deadLetterExchangeType: topic
              declareExchange: false
              bindQueue: true
              queueNameGroupOnly: true
              bindingRoutingKey: 'my.queue.rkey'
      bindings:
        consumeFunction-in-0:
          destination: my.queue.exchange
          group: MY_QUEUE

Solution

  • You need to set republishToDlq: false on the rabbit consumer properties. In the RabbitExchangeQueueProvisioner you can see the following code (see the comment_

    if (properties instanceof RabbitConsumerProperties
                        && ((RabbitConsumerProperties) properties).isRepublishToDlq()) {
        /*
         * Also bind with the base queue name when republishToDlq is used, which
         * does not know about partitioning
         */
        declareBinding(dlqName, new Binding(dlq.getName(), DestinationType.QUEUE,
                            dlxName, baseQueueName, arguments));
    }
    

    You can also get more details here as to why.

    Also, I see you are using

    spring:
      cloud:
        stream:
          function:
            definition: consumeFunction
    

    Please change it to

    spring:
      cloud:
        function:
          definition: consumeFunction
    

    as the other property is deprecated and is already removed in 3.2 version.