I have legacy system that send message to RabbitMQ.
The system use only one queue : q.finance.invoice
but it has two types of message, where the type of message is available on header.
The first type
Type : invoice.created
{
"field_1" : "",
"field_2" : "",
}
The second type
Type : invoice.paid
{
"field_5" : "",
"field_6" : "",
}
So now my consumer need to handle the message selectively based on data type.
Spring has @RabbitHandler
that possible to do this... IF the message is published by spring.
I cannot use a @RabbitHandler
annotation though.
I think it because the @RabbitHandler
is converting message based on __TypeId__
header that does not exists from legacy system.
How can I simulate this @RabbitHandler
behaviour (taking data based on it's type)?
So I use @RabbitListener
to consume message.
But @RabbitListener
is taking all types of message.
Another reason we use @RabbitListener
is because our error handler depends on Message
and Channel
The basic method signatue we have is like this:
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
// convert message body JSON string to object
// process it
}
I'm trying to do manual reject based on type, which works. But I'm sure it is not scalable when I have many listeners or queues
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
@Service
public class InvoiceListenerOnMethod {
private static final Logger log = LoggerFactory.getLogger(InvoiceListenerOnMethod.class);
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoiceCreated(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
if (!StringUtils.equalsIgnoreCase("invoice.created", message.getMessageProperties().getType())) {
log.warn("[on Method] Rejecting invoice created : {}", message);
channel.basicReject(tag, true);
return;
}
log.info("[on Method] Listening invoice created : {}", message);
}
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
if (!StringUtils.equalsIgnoreCase("invoice.paid", message.getMessageProperties().getType())) {
log.warn("[on Method] Rejecting invoice paid : {}", message);
channel.basicReject(tag, true);
return;
}
log.info("[on Method] Listening invoice paid : {}", message);
}
}
See, the point if when I have 4 messages (paid-paid-created-created), the listener can runs more than 4 times, because we cannot control who will take which message. So it can be like this for the listenInvoicePaid()
And the same way multiple rejects() before ack() can also be happen in listenInvoiceCreated()
So in total I can have like 10 messages call or so, before all the message properly processed.
Any suggestion to fix the code?
You can add a MessagePostProcessor
to the container factory's afterReceiveMessagePostProcessor
property. In the post processor, you can examine the JSON body()
and set the __TypeId__
header to the appropriate class name.
See this answer for an example.