Search code examples
javarabbitmqspring-rabbit

How to send correlation id, into message, from sender and retrieval from receive into message header with Rabbit MQ by java


I have used Rabbit MQ to send and receive a JSON message.
I have implemented an application using the RabbitTemplate instance with convertAndSend method to send a message to the exchange as below: enter image description here

rabbitTemplate.convertAndSend(exchangeNameOut, message.getString(PERSISTENCE_MESSAGE_ROUTING_KEY),
                    message.getString(PERSISTENCE_MESSAGE_BODY), new CorrelationData(""+analyticRuntime.getId()));

While to retrieval a message, side receiver, I have implemented the followed method:

        @RabbitListener(queues = "${rabbit.queue.mail.name}",containerFactory = "rabbitListenerContainerFactory")
     public void processMailMessage(Message message) {
        log.info("ENTER [processMailMessage]  ");

        Mail mail;

            JSONObject messageBody = new JSONObject(new String(message.getBody()));

            String asset = "" + messageBody.get(Constants.PERSISTENCE_MESSAGE_ASSET_ID_KEY);

            String body = "" + messageBody.get(Constants.PERSISTENCE_MESSAGE_EVENT_KEY);

            String alarms = "";
            log.info("[processMailMessage] message.getMessageProperties().getCorrelationId() : " + message.getMessageProperties().getCorrelationId()";
}

The question is:

  • Why the message.getMessageProperties().getCorrelationId() is NULL ? Into the send method I have inserted the correlation.
  • Correlation Id is the same thing of Correlation Data that I have inserted into convertAndSend method ?
  • How can I retrieval the correlation Id into receiver method ?

Thanks


Solution

  • No; correlation data is for correlating publisher confirms for a send; it has nothing to do with the correlation Id property.

    Use a MessagePostProcessor:

    rabbitTemplate.convertAndSend(exchangeNameOut, message.getString(PERSISTENCE_MESSAGE_ROUTING_KEY),
                    message.getString(PERSISTENCE_MESSAGE_BODY), 
        m -> {
            m.getMessageProperties().setCorrelationIdString(""+analyticRuntime.getId());
            return m;
        });
    

    If you are not using Java8, use new MessagePostProcessor() { ... }