Search code examples
rabbitmqspring-amqpspring-rabbit

How to read messages destructively from RabbitMq queue using SimpleMessageListenerContainer bean


I need a help to implement a SimpleMessageListenerContainer in a manner that it removes the message, un-transactionally, as soon as read by the message receiver.

In my case, irrespective of successful or unsuccessful transactions, the message put to the queue hangs somewhere (not in queue), and is processed repeatedly on every operation that reads from the queue. So all other messages put to the queue remain unaccessible, and only the first one is re-processed every time.

The other strange thing is that, I can not see the messages landing/queuing on the queue, i.e. the queue depth never changes on the Rabbit management console, but just the message rate taking a jump on every write to the queue.

Below is the code snippet of my Java configuration. It will be of great help if someone can point out the mistake in here:-

@Configuration
@EnableJpaRepositories(basePackages={"com.xxx.muppets"})
public class MuppetMessageConsumerConfig {

private ApplicationContext applicationContext;
@Value("${rabbit.queue.name}")
private String queueName;

@Autowired
public void setApplicationContext(ApplicationContext applicationContext) {
    this.applicationContext = applicationContext;
}

@Bean
Queue queue() {
    return new Queue(queueName, false);
}

@Bean
TopicExchange exchange() {
    return new TopicExchange("spring-boot-exchange");
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(queueName);
}

@Bean
MuppetMsgReceiver muppetMsgReceiver(){
    return new MuppetMsgReceiver();
}

@Bean
MessageListenerAdapter listenerAdapter(MuppetMsgReceiver muppetMsgReceiver){
    return new MessageListenerAdapter(muppetMsgReceiver, "receiveMessage");
}

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setAcknowledgeMode(NONE);
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(queueName);
    container.setMessageListener(listenerAdapter);
    return container;
}
}

My message receiver class is as below:

public class MuppetMsgReceiver {

private String muppetMessage;

private CountDownLatch countDownLatch;

public MuppetMsgReceiver() {
    this.countDownLatch = new CountDownLatch(1);
}

public MuppetMsgReceiver(CountDownLatch latch) {
    this.countDownLatch = latch;        
    CountDownLatch getCountDownLatch() {
    return countDownLatch;
}

public void receiveMessage(String receivedMessage) {
    countDownLatch.countDown();
    this.muppetMessage = receivedMessage;
}

public String getMuppetMessage() {
    return muppetMessage;
}
}

This complete code is based upon the getting started example of Spring but is not helping due to non-destructive reads from the queue.


Solution

  • AcknowledgeMode=NONE means the broker acks the message as soon as it's sent to the consumer, not when the consumer receives it so your observed behavior makes no sense to me. It is quite unusual to use this ack mode but the message will be gone as soon as it's sent. This is likely a problem with your code. TRACE logging will help you see what's going on.