Search code examples
spring-amqpspring-retryspring-integration-amqp

Fetch message details in Spring RecoveryCallback


I'm publishing messages into RabbitMQ and I would like to track the errors when RabbitMQ is down, for this I added one RetryTemplate with the recovery callback, but the recovery callback only provides this method getLastThrowable() and I'm not sure how to provide the details of the messages that failed when RabbitMQ is down. (as per documentation "The RecoveryCallback is somewhat limited in that the retry context only contains the lastThrowable field. For more sophisticated use cases, you should use an external RetryTemplate so that you can convey additional information to the RecoveryCallback via the context’s attributes") but I don't know how to do that, if anyone could help me with one example that will be awesome.

Rabbit Template

public RabbitTemplate rabbitMqTemplate(RecoveryCallback publisherRecoveryCallback) {
    RabbitTemplate r = new RabbitTemplate(rabbitConnectionFactory);
    r.setExchange(exchangeName);
    r.setRoutingKey(routingKey);
    r.setConnectionFactory(rabbitConnectionFactory);
    r.setMessageConverter(jsonMessageConverter());

    RetryTemplate retryTemplate = new RetryTemplate();
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    r.setRetryTemplate(retryTemplate);
    r.setRecoveryCallback(publisherRecoveryCallback);
    return r;
    }

Recovery Callback

@Component
public class PublisherRecoveryCallback implements RecoveryCallback<AssortmentEvent> {
    @Override
    public AssortmentEvent recover(RetryContext context) throws Exception {
        log.error("Error publising event",context.getLastThrowable());
        //how to get message details here??
        return null;
    }
}

AMQP Outbound Adapter

return IntegrationFlows.from("eventsChannel") .split() .handle(Amqp.outboundAdapter(rabbitMqTemplate) .exchangeName(exchangeName) .confirmCorrelationExpression("payload") .confirmAckChannel(ackChannel) .confirmNackChannel(nackChannel) ) .get();


Solution

  • The isn't possible because the function RabbitTemplate.execute() is already not aware about message you send, because it may be performed from any other method, where we might not have messages to deal:

    return this.retryTemplate.execute(
                        (RetryCallback<T, Exception>) context -> RabbitTemplate.this.doExecute(action, connectionFactory),
                        (RecoveryCallback<T>) this.recoveryCallback);
    

    What I suggest you to do is like storing message to the ThreadLocal before send and get it from there from your custom RecoveryCallback.