Search code examples
rabbitmqspring-amqp

spring amqp custom TTL and retry count


We are trying to implement Retry mechanism on client exceptions. We want to be able to set different routing key, ttl and retry count based on the content in each message. We want to keep the handler simple, i.e; for handleMessage to throw exception. How do we handle this exception and send the message to DLX with appropriate parameters. On retry if the failure happens again - message would be discarded (acknowledged) , or will be put back on DLX with incrementing the retry count. where would we implement this logic and how would be wired?

========================

With Gary's direction, I was able to implement. Here are excerpts ..

@Bean
public SimpleMessageListenerContainer listenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    jsonMessageHandler.setQueueName(queueName);
    container.setQueueNames(queueName);
    container.setMessageListener(jsonMessageListenerAdapter());
    container.setAdviceChain(new Advice[]{retryOperationsInterceptor()});

    return container;
}

@Bean
public MessageListenerAdapter messageListenerAdapter() {
    return new MessageListenerAdapter(messageHandler,messageConverter);
}

@Bean
public MessageListenerAdapter jsonMessageListenerAdapter() {
    return new MessageListenerAdapter(jsonMessageHandler);
}


@Bean
RetryOperationsInterceptor retryOperationsInterceptor() {
    return RetryInterceptorBuilder.stateless().recoverer(republishMessageRecoverer).maxAttempts(1).build();
}

@Bean
RepublishMessageRecoverer republishMessageRecoverer() {
    return new MyRepublishMessageRecoverer(rabbitTemplate());
}

==========

public class MyRepublishMessageRecoverer extends RepublishMessageRecoverer {

// - constructor

@Override
public void recover(Message message, Throwable cause) {
    //Deal with headers

    long currentCount = 0;

    List xDeathList = (List)message.getMessageProperties().getHeaders().get("x-death");
    if(xDeathList != null && xDeathList.size() > 0) {
        currentCount =  (Long)((Map)(xDeathList.get(0))).get("count");
    }

    if(currentCount < context.getRules().getNumberOfRetries()) {
        //message sent to DLX
        this.retryTemplate.send(handlerProperties.getSystem(), message);
    } else {
       //message ignored    
    }

    throw new AmqpRejectAndDontRequeueException(cause);
}

}


Solution

  • You can't modify a rejected message, it is routed to the DLX/DLQ unchanged (except x-death headers are added by the broker).

    You have to republish to the DLX/DLQ yourself if you want to change message properties.

    You can use Spring Retry with a customized RepublishMessageRecoverer to do this.