Search code examples
javarabbitmqmessage-queueamqp

Rabbitmq: Modify message body before consuming


I'm new in RabbitMQ and I want to modify a message before consuming it by a queue. I have an exchange which should stay untouchable. The client receives messages with a specific routing key. But there a lot of them and I want to filter and change body before publishing them into queue.

Exchange populate messages looks like:

{
"_context_domain": "unsuitable",
"_msg_id": "1",
"_context_quota_class": null, 
"_context_read_only": false,
"_context_request_id": "1"
}

{
"_context_domain": "suitable",
"_msg_id": "2",
"_context_quota_class": null, 
"_context_read_only": false,
"_context_request_id": "2"
}

Is there any way to filter and modify them before consuming? For example:

...
channel.queueBind(QUEUE_NAME, "EXCHANGE_NAME", "ROUTNG_KEY");
final Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              String message = new String(body, "UTF-8");
              Gson mapper = new Gson();
              SomeObject object = (SomeObject) mapper.fromJson(message, SomeObject.class);
                     if (SomeObject.getContext_domain = "suitable"){
                           //publish somehow SomeObject.getMsg_id into QUEUE_NAME 
}
}

Is there any way to do it?


Solution

  • Exchanges supported by AMQP do not allow filtering on the message body. You could use "topic" or "header" exchanges which can route messages based on the routing key or a message header.

    However none of them allows you to modify the message itself. If you want to do that, you need to develop your own RabbitMQ plugin implementing an exchange.

    If you want to go that road, it is quite easy to do. However, exchanges are not exactly designed for this purpose: they are simply routing tables. If you do that, the channel process in RabbitMQ will consume resources to do whatever modifications you want (ie. an exchange is not a process, it's a line in a table basically). That also means other clients on the same connection may be blocked until the channel finished to modify and queue a message.

    A more common way to achieve what you want is to use a consumer to handle all messages, do whatever filtering/modifications you need there and queue the result in a second queue. This second queue would be consumed by your normal workers.