Search code examples
javaspring-bootrabbitmqamqpspring-cloud-stream

How to handle unknown/invalid binding routing key values in RabbitMQ?


I'd like to know what is the best way to handle messages with unknown/invalid routing key values inside an exchange? In my case, I'm sending all my messages inside the same exchange and based on a routing key, messages are routed to the corresponding queue. Here's my configuration (I'm using Spring Cloud Stream) :

spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.bindings.output.producer.routingKeyExpression: payload.type

spring.cloud.stream.bindings.input-type1.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type1.group: input.type1 # Queue 1
spring.cloud.stream.bindings.input-type2.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type2.group: input.type2 # Queue 2

spring.cloud.stream.rabbit.bindings.input-type1.consumer.bindingRoutingKey: FOO
spring.cloud.stream.rabbit.bindings.input-type2.consumer.bindingRoutingKey: BAR

Now what I'm asking is what happens if I send a message with payload.type='ANY'? Obviously, this message won't be retrieved by any consumer and will remain inside the exchange, but what is the best way to keep track of these "unknown" messages? Can I use a DLQ for this?

Thanks!


Solution

  • will remain inside the exchange,

    No; exchanges don't "hold" messages, they are simply routers.

    Unroutable messages are discarded by default.

    You can configure the binding to return unroutable messages.

    See Error Channels.

    Returns are asynchronous.

    In the upcoming 3.1 release, you can wait on a future to determine whether the message was sent successfully or not. See Publisher Confirms.

    If the message is unroutable, the correlation data's returnedMessage property is set.

    The framework uses the mandatory feature mentioned in another answer.

    EDIT

    Here is an example:

    spring.rabbitmq.publisher-returns: true
    
    spring.cloud.stream.bindings.output.destination: my-exchange
    spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression: headers['rk']
    
    spring.cloud.stream.bindings.output.producer.error-channel-enabled: true
    
    @SpringBootApplication
    @EnableBinding(Source.class)
    public class So65134452Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So65134452Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(MessageChannel output) {
            return args -> {
                output.send(MessageBuilder.withPayload("foo")
                        .setHeader("rk", "invalid")
                        .build());
            };
        }
    
        @Autowired
        RabbitTemplate template;
    
        @Bean
        public Queue unroutable() {
            return new Queue("unroutable.messages");
        }
    
        @ServiceActivator(inputChannel = "errorChannel")
        public void error(Message<?> error) {
            if (error.getPayload() instanceof ReturnedAmqpMessageException) {
                this.template.send(unroutable().getName(),
                        ((ReturnedAmqpMessageException) error.getPayload()).getAmqpMessage());
            }
        }
    
    }