I am using spring-amqp consumerBatchEnabled to process messages in batch. and for error handling and retry, using Dead letter exchange with spring-retry backoff with recoverer functionality.
If there is any exeception during processing of batch in the main queue I am rejecting the entire batch and then messages are processed individually by Dead letter queue listener. I have configured Advice chain for DLQ listener for retries. Idea is to have fixed number of retries in DLQ listener before discarding the message.
Retry functionality works fine and recoverer is also invoked but message remains in unack state in dead letter queue. As per my understanding message should either be dropped from the queue in case of AmqpRejectAndDontRequeueException, but it does not seem to be happening, Am I missing something here. (This also does not work if I attached DLX)
(With default listener container factory and retry properties in application.yml this works as expected but not with custom listenercontainerfactory)
Below is the test code:
Configuration Class
package com.example.demo;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@EnableRabbit
@Configuration
public class RabbitMQConfig {
@Bean
DirectExchange deadLetterExchange() {
return new DirectExchange("deadLetterExchange");
}
@Bean
DirectExchange exchange() {
return new DirectExchange("person.exchange");
}
@Bean
Queue dlq() {
return QueueBuilder.durable("deadLetter.queue").build();
}
@Bean
Queue queue() {
return QueueBuilder.durable("person.queue").withArgument("x-dead-letter-exchange", "deadLetterExchange")
.withArgument("x-dead-letter-routing-key", "deadLetter").build();
}
@Bean
Binding DLQbinding() {
return BindingBuilder.bind(dlq()).to(deadLetterExchange()).with("deadLetter");
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("person.key");
}
@Bean
@Autowired
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
String rabbitmq_host = "localhost";
String rabbitmq_port = "5672";
String rabbitmq_user = "guest";
String rabbitmq_pwd = "guest";
factory.setHost(rabbitmq_host);
factory.setPort(Integer.parseInt(rabbitmq_port));
factory.setUsername(rabbitmq_user);
factory.setPassword(rabbitmq_pwd);
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setPrefetchCount(1000);
factory.setBatchListener(true);
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
factory.setReceiveTimeout(1000l);
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitDlqListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConcurrentConsumers(1);
factory.setPrefetchCount(1000);
factory.setDefaultRequeueRejected(true);
factory.setAdviceChain(RetryInterceptorBuilder.stateless().backOffOptions(
2000,1,10000).maxAttempts(3).
recoverer(new RejectAndDontRequeueRecoverer()).build());
configurer.configure(factory, connectionFactory);
return factory;
}
}
Listeners
package com.example.demo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@Component
public class RabbitMQConsumer {
List<String> list = new ArrayList<>();
private int i = 0;
@RabbitListener(id = "myRabbitListener", queues = "person.queue", containerFactory = "rabbitListenerContainerFactory")
public void getMessage(List<Message<Person>> messages, Channel channel) throws InvalidPersonException, IOException {
System.out.println(messages.size());
long tag = (long)messages.get(messages.size()-1).getHeaders().get((AmqpHeaders.DELIVERY_TAG));
channel.basicNack(tag, true, false);
/* System.out.println(Thread.currentThread());
if(message.getPayload().getName().equals("test")) {
channel.basicNack(tag, false, false);
}*/
}
}
package com.example.demo;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.messaging.handler.annotation.Header;
import static java.util.concurrent.TimeUnit.SECONDS;
@Component
public class DlqConsumer {
List<String> list = new ArrayList<>();
private int i = 0;
@RabbitListener(id = "myDlqListener", queues = "deadLetter.queue", containerFactory = "rabbitDlqListenerContainerFactory")
public void getMessage(Message<Person> message, Channel channel) throws InvalidPersonException {
System.out.println(message.getPayload().getName());
long tag = (long)message.getHeaders().get((AmqpHeaders.DELIVERY_TAG));
System.out.println(Thread.currentThread());
throw new InvalidPersonException();
}
}
Exception Class:
package com.example.demo;
public class InvalidPersonException extends Exception{
private static final long serialVersionUID = -3154618962130084535L;
}
package com.example.demo;
import java.io.Serializable;
public class Person implements Serializable {
private Long Id;
private String name;
public Person(){
}
public Person(Long id, String name) {
super();
Id = id;
this.name = name;
}
public Long getId() {
return Id;
}
public void setId(Long id) {
Id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
build.gradle
buildscript {
ext {
springBootVersion = '2.2.10.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
group = 'com.test'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
implementation 'junit:junit:4.12'
compile('org.springframework.boot:spring-boot-starter-amqp:2.4.8')
compile('org.springframework.boot:spring-boot-starter-web:2.2.10.RELEASE')
testCompile('org.springframework.boot:spring-boot-starter-test')
compile "io.springfox:springfox-swagger2:2.7.0"
compile "io.springfox:springfox-swagger-ui:2.7.0"
testCompile "org.springframework.amqp:spring-rabbit-test:2.3.6"
}
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
When you do manual acks, you are responsible for all Acks/Nacks.
Use the default mode (AUTO
).