Search code examples
javaspringrabbitmqspring-rabbit

Rabbit MQ adding header property to each message automatically


I am using Java 21 Sprig Boot, RabbitMQ 3.9.28 and I have many components that send messages to Rabbit. I need to add to a message header new property that would be a timestamp when the message was rejected because of exception in message handler. I need it because I have hundrets of messages in different error queues and I dont know when the exception occured - I cannot see any timestamp in message header or body so I cannot navigte to proper log file and correlate message with logs. Is there any interceptor abstraction that I could use some examples how to add a timestamp when specific message handling failed ?

EDIT: I am not catching any errors in message handler logic. Also I dont have any fancy logic for error handling in scope of queues, I dont want that.. I would need to reimplement huge amount of code and configuration because there are some many handlers and message types..

 @Bean
    public Queue myQueue() {
        Map<String, Object> parameters = new HashMap<>();
        parameters.put(X_DEAD_LETTER_EXCHANGE, "");
        parameters.put(X_DEAD_LETTER_ROUTING_KEY, myName + ERRORS_POSTFIX);
        return new Queue(myName, true, false, false, parameters);
    }

    
    @Bean
    public Queue myErrorQueue(Queue myQueue) {
        Map<String, Object> parameters = new HashMap<>();
        return new Queue(myQueue.getName() + ERRORS_POSTFIX, true, false, false, parameters);
    }

Solution

  • Right. Since you use standard RabbitMQ DLX algorithm which really happens on the broker when the message is rejected (nack'ed) from the application, therefore the same message is sent to that DLX without any possibility to modify it.

    You might consider to look into an attribute of the x-death header which will appear in the message when it comes back to the original queue after DLX expiration.

    See more info in respective docs: https://www.rabbitmq.com/docs/dlx

    Another way is to republish message manually where you would be able to add extra info into a message since it will be a new one: https://docs.spring.io/spring-amqp/reference/amqp/resilience-recovering-from-errors-and-broker-failures.html#async-listeners.

    See RepublishMessageRecoverer.additionalHeaders() method to override:

    /**
     * Subclasses can override this method to add more headers to the republished message.
     * @param message The failed message.
     * @param cause The cause.
     * @return A {@link Map} of additional headers to add.
     */
    protected Map<? extends String, ?> additionalHeaders(Message message, Throwable cause) {