Search code examples
javaspringjunitspring-amqpspring-rabbit

Spring AMQP rabbitmq retry backoff with recover do not dequeue messages from DLQ


I am using spring-amqp consumerBatchEnabled to process messages in batch. and for error handling and retry, using Dead letter exchange with spring-retry backoff with recoverer functionality.

If there is any exeception during processing of batch in the main queue I am rejecting the entire batch and then messages are processed individually by Dead letter queue listener. I have configured Advice chain for DLQ listener for retries. Idea is to have fixed number of retries in DLQ listener before discarding the message.

Retry functionality works fine and recoverer is also invoked but message remains in unack state in dead letter queue. As per my understanding message should either be dropped from the queue in case of AmqpRejectAndDontRequeueException, but it does not seem to be happening, Am I missing something here. (This also does not work if I attached DLX)

(With default listener container factory and retry properties in application.yml this works as expected but not with custom listenercontainerfactory)

Below is the test code:

Configuration Class

package com.example.demo;

import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@EnableRabbit
@Configuration
public class RabbitMQConfig {


    @Bean
    DirectExchange deadLetterExchange() {
        return new DirectExchange("deadLetterExchange");
    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange("person.exchange");
    }

    @Bean
    Queue dlq() {
        return QueueBuilder.durable("deadLetter.queue").build();
    }

    @Bean
    Queue queue() {
        return QueueBuilder.durable("person.queue").withArgument("x-dead-letter-exchange", "deadLetterExchange")
                .withArgument("x-dead-letter-routing-key", "deadLetter").build();
    }

    @Bean
    Binding DLQbinding() {
        return BindingBuilder.bind(dlq()).to(deadLetterExchange()).with("deadLetter");
    }

    @Bean
    Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("person.key");
    }

    @Bean
    @Autowired
    public ConnectionFactory connectionFactory() {
         CachingConnectionFactory factory = new CachingConnectionFactory();

        String rabbitmq_host = "localhost";
        String rabbitmq_port = "5672";
        String rabbitmq_user = "guest";
        String rabbitmq_pwd = "guest";

        factory.setHost(rabbitmq_host);
        factory.setPort(Integer.parseInt(rabbitmq_port));
         factory.setUsername(rabbitmq_user);
        factory.setPassword(rabbitmq_pwd);

        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }


    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setPrefetchCount(1000);
        factory.setBatchListener(true);
        factory.setBatchSize(2);
        factory.setConsumerBatchEnabled(true);
        factory.setReceiveTimeout(1000l);
        configurer.configure(factory, connectionFactory);
        return factory;
    }


    @Bean
    public SimpleRabbitListenerContainerFactory rabbitDlqListenerContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setConcurrentConsumers(1);
        factory.setPrefetchCount(1000);
        factory.setDefaultRequeueRejected(true);
        factory.setAdviceChain(RetryInterceptorBuilder.stateless().backOffOptions(
                2000,1,10000).maxAttempts(3).
                recoverer(new RejectAndDontRequeueRecoverer()).build());
       configurer.configure(factory, connectionFactory);
        return factory;
    }
}

Listeners

package com.example.demo;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

@Component
public class RabbitMQConsumer {

    List<String> list = new ArrayList<>();

    private int i = 0;

    @RabbitListener(id = "myRabbitListener", queues = "person.queue", containerFactory = "rabbitListenerContainerFactory")
    public void getMessage(List<Message<Person>> messages, Channel channel) throws InvalidPersonException, IOException {
        System.out.println(messages.size());
        long tag = (long)messages.get(messages.size()-1).getHeaders().get((AmqpHeaders.DELIVERY_TAG));
        channel.basicNack(tag, true, false);
        /* System.out.println(Thread.currentThread());
        if(message.getPayload().getName().equals("test")) {
            channel.basicNack(tag, false, false);
        }*/
    }
}
package com.example.demo;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.messaging.handler.annotation.Header;

import static java.util.concurrent.TimeUnit.SECONDS;

@Component
public class DlqConsumer {

    List<String> list = new ArrayList<>();

    private int i = 0;

    @RabbitListener(id = "myDlqListener", queues = "deadLetter.queue", containerFactory = "rabbitDlqListenerContainerFactory")
    public void getMessage(Message<Person> message, Channel channel) throws InvalidPersonException {
            System.out.println(message.getPayload().getName());
            long tag = (long)message.getHeaders().get((AmqpHeaders.DELIVERY_TAG));
            System.out.println(Thread.currentThread());
           throw new InvalidPersonException();
    }
}

Exception Class:

package com.example.demo;

public class InvalidPersonException extends Exception{
    private static final long serialVersionUID = -3154618962130084535L;
}
package com.example.demo;

import java.io.Serializable;

public class Person implements Serializable {
    private Long Id;

    private String name;

    public Person(){

    }

    public Person(Long id, String name) {
        super();
        Id = id;
        this.name = name;
    }

    public Long getId() {
        return Id;
    }

    public void setId(Long id) {
        Id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

}

build.gradle

buildscript {
    ext {
        springBootVersion = '2.2.10.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'com.test'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    implementation 'junit:junit:4.12'
    compile('org.springframework.boot:spring-boot-starter-amqp:2.4.8')
    compile('org.springframework.boot:spring-boot-starter-web:2.2.10.RELEASE')
    testCompile('org.springframework.boot:spring-boot-starter-test')
    compile "io.springfox:springfox-swagger2:2.7.0"
    compile "io.springfox:springfox-swagger-ui:2.7.0"
    testCompile "org.springframework.amqp:spring-rabbit-test:2.3.6"
}

Solution

  • factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

    When you do manual acks, you are responsible for all Acks/Nacks.

    Use the default mode (AUTO).