Search code examples
spring-bootspring-amqpspring-rabbit

Taking only specific message from @RabbitListener


I have legacy system that send message to RabbitMQ. The system use only one queue : q.finance.invoice but it has two types of message, where the type of message is available on header.

The first type

Type : invoice.created
{
  "field_1" : "",
  "field_2" : "",
}

The second type

Type : invoice.paid

{
  "field_5" : "",
  "field_6" : "",
}

So now my consumer need to handle the message selectively based on data type. Spring has @RabbitHandler that possible to do this... IF the message is published by spring. I cannot use a @RabbitHandler annotation though. I think it because the @RabbitHandler is converting message based on __TypeId__ header that does not exists from legacy system.

How can I simulate this @RabbitHandler behaviour (taking data based on it's type)?

So I use @RabbitListener to consume message. But @RabbitListener is taking all types of message. Another reason we use @RabbitListener is because our error handler depends on Message and Channel The basic method signatue we have is like this:

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
   // convert message body JSON string to object
   // process it
}

I'm trying to do manual reject based on type, which works. But I'm sure it is not scalable when I have many listeners or queues

import java.io.IOException;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

import com.rabbitmq.client.Channel;

@Service
public class InvoiceListenerOnMethod {

    private static final Logger log = LoggerFactory.getLogger(InvoiceListenerOnMethod.class);

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoiceCreated(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
            throws IOException {
        if (!StringUtils.equalsIgnoreCase("invoice.created", message.getMessageProperties().getType())) {
            log.warn("[on Method] Rejecting invoice created : {}", message);
            channel.basicReject(tag, true);
            return;
        }

        log.info("[on Method] Listening invoice created : {}", message);
    }

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
            throws IOException {
        if (!StringUtils.equalsIgnoreCase("invoice.paid", message.getMessageProperties().getType())) {
            log.warn("[on Method] Rejecting invoice paid : {}", message);
            channel.basicReject(tag, true);
            return;
        }

        log.info("[on Method] Listening invoice paid : {}", message);
    }

}

See, the point if when I have 4 messages (paid-paid-created-created), the listener can runs more than 4 times, because we cannot control who will take which message. So it can be like this for the listenInvoicePaid()

  • reject()
  • reject()
  • ack()
  • reject()
  • ack()

And the same way multiple rejects() before ack() can also be happen in listenInvoiceCreated()
So in total I can have like 10 messages call or so, before all the message properly processed.

Any suggestion to fix the code?


Solution

  • You can add a MessagePostProcessor to the container factory's afterReceiveMessagePostProcessor property. In the post processor, you can examine the JSON body() and set the __TypeId__ header to the appropriate class name.

    See this answer for an example.