Search code examples
javaspringspring-cloud-streammessagebrokerdead-letter

Handling dead-letter queue message-broker independent way


I have a project that currently uses Spring Cloud Streams and RabbitMQ underneath. I've implemented a logic based on the documentation. See below:

@Component
public class ReRouteDlq {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;

    private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
            String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
            this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

It does what it is expected, however, it is binded to RabbitMQ, and my company is planning to stop using this message broker in one year or two (don't know why, must be some crazy business). So, I want to implement the same thing, but detach it from any message broker.

I tried changing the rePublish method this way, but it does not work:

    @StreamListener(Sync.DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
            String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
            this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

It fails because the Message class has immutable Headers - throws exception on the put attempt saying you can't change its values (uses org.springframework.messaging.Message class).

Is there a way to implement this dead-letter queue handler in a message broker independent way?


Solution

  • Use

    MessageBuilder.fromMessage(message)
        .setHeader("foo", "bar")
         ...
        .build();
    

    Note that the message in @StreamListener is a spring-messaging Message<?>, not a spring-amqp Message and can't be sent using the template that way; you need an output binding to send the message to.