Search code examples
apache-cameljmsactivemq-classicintegrationmessaging

Camel in/out messages and correlation ID confusion


I'm confused how Camel automatically forms messages and correlation identifiers, when using message queues.

For an example, I have two service with following routes

<route id="cxfToJMSRoute">

        <from uri="cxf:bean:endpoint" />
        <convertBodyTo type="String" />

        <log message="..." />
        <log message="In message: ${in.body}" />
        <log message="In JMSMessageID: ${in.header.JMSMessageID}" />
        <log message="In JMSCorrelationID: ${in.header.JMSCorrelationID}" />

        <log message="Out message: ${out.body}" />
        <log message="Out JMSMessageID: ${out.header.JMSMessageID}" />
        <log message="Out JMSCorrelationID: ${out.header.JMSCorrelationID}" />

        <log message="Invoking service A by sending InOut message to api queue" />
        <to uri="activemq:queue:api?replyTo=result&amp;replyToType=Exclusive"
            pattern="InOut" />

        <log message="..." />
        <log message="Got message from Service A" />
        <log message="In message: ${in.body}" />
        <log message="In JMSMessageID: ${in.header.JMSMessageID}" />
        <log message="In JMSCorrelationID: ${in.header.JMSCorrelationID}" />

        <log message="Out message: ${out.body}" />
        <log message="Out JMSMessageID: ${out.header.JMSMessageID}" />
        <log message="Out JMSCorrelationID: ${out.header.JMSCorrelationID}" />

        <!-- Correlation ID? -->
        <pollEnrich>
            <constant>activemq:queue:reply</constant>
        </pollEnrich>

        <log message="..." />
        <log message="Got message from Service B" />
        <log message="In message: ${in.body}" />
        <log message="In JMSMessageID: ${in.header.JMSMessageID}" />
        <log message="In JMSCorrelationID: ${in.header.JMSCorrelationID}" />

        <log message="Out message: ${out.body}" />
        <log message="Out JMSMessageID: ${out.header.JMSMessageID}" />
        <log message="Out JMSCorrelationID: ${out.header.JMSCorrelationID}" />

        <transform>
            <simple>${in.body}</simple>
        </transform>

        <log message="After transform:" />
        <log message="In message: ${in.body}" />
        <log message="In JMSMessageID: ${in.header.JMSMessageID}" />
        <log message="In JMSCorrelationID: ${in.header.JMSCorrelationID}" />

        <log message="Out message: ${out.body}" />
        <log message="Out JMSMessageID: ${out.header.JMSMessageID}" />
        <log message="Out JMSCorrelationID: ${out.header.JMSCorrelationID}" />
        <log message="..." />
    </route>

--

<route id="apiToServiceARoute">
      <from uri="activemq:queue:api"/>

      <log message="..." />
      <log message="In message: ${in.body}" />
      <log message="In JMSMessageID: ${in.header.JMSMessageID}" />
      <log message="In JMSCorrelationID: ${in.header.JMSCorrelationID}" />

      <log message="Out message: ${out.body}" />
      <log message="Out JMSMessageID: ${out.header.JMSMessageID}" />
      <log message="Out JMSCorrelationID: ${out.header.JMSCorrelationID}" />

      <log message="Mocking service B by sending InOnly message to reply queue" />
      <to uri="activemq:queue:reply?exchangePattern=InOnly" pattern="InOnly" />

      <transform>
        <simple>Delegated</simple>
      </transform>

      <log message="..." />
      <log message="After transform:" />
      <log message="In message: ${in.body}" />
      <log message="In JMSMessageID: ${in.header.JMSMessageID}" />
      <log message="In JMSCorrelationID: ${in.header.JMSCorrelationID}" />

      <log message="Out message: ${out.body}" />
      <log message="Out JMSMessageID: ${out.header.JMSMessageID}" />
      <log message="Out JMSCorrelationID: ${out.header.JMSCorrelationID}" />
      <log message="..." />
  </route>

Here are my observations according to log messages

  1. OUT message is always empty. Does Camel always use IN by default, until OUT is created by some processor excliptly?
  2. JMSMessageID and JMSCorrelationID doesn't seem to have any connection. Does Camel only use JMSCorrelationID for correlating messages?
  3. Camel manages to retain the JMSCorrelationID when sending InOnly message to another queue (reply) where the request originates from. This is desirable, but how does it actually do it? What are the specific rules of retaining the JMSCorrelationID?

And here are the actual logs of both example routes

2017-12-03 12:16:06.197  INFO 13780 --- [qtp335580595-36] cxfToJMSRoute                            : ...
2017-12-03 12:16:06.197  INFO 13780 --- [qtp335580595-36] cxfToJMSRoute                            : In message: testi2
2017-12-03 12:16:06.198  INFO 13780 --- [qtp335580595-36] cxfToJMSRoute                            : In JMSMessageID:
2017-12-03 12:16:06.198  INFO 13780 --- [qtp335580595-36] cxfToJMSRoute                            : In JMSCorrelationID:
2017-12-03 12:16:06.198  INFO 13780 --- [qtp335580595-36] cxfToJMSRoute                            : Out message:
2017-12-03 12:16:06.198  INFO 13780 --- [qtp335580595-36] cxfToJMSRoute                            : Out JMSMessageID:
2017-12-03 12:16:06.198  INFO 13780 --- [qtp335580595-36] cxfToJMSRoute                            : Out JMSCorrelationID:
2017-12-03 12:16:06.198  INFO 13780 --- [qtp335580595-36] cxfToJMSRoute                            : Invoking service A by sending InOut message to api queue
2017-12-03 12:16:06.569  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : ...
2017-12-03 12:16:06.570  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : Got message from Service A
2017-12-03 12:16:06.570  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : In message: Delegated
2017-12-03 12:16:06.582  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : In JMSMessageID: ID:DESKTOP-LI5P50P-54036-1512296151920-1:1:1:1:1
2017-12-03 12:16:06.582  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : In JMSCorrelationID: Camel-ID-DESKTOP-LI5P50P-1512296077166-0-2
2017-12-03 12:16:06.582  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : Out message:
2017-12-03 12:16:06.582  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : Out JMSMessageID:
2017-12-03 12:16:06.583  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : Out JMSCorrelationID:
2017-12-03 12:16:06.596  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : ...
2017-12-03 12:16:06.596  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : Got message from Service B
2017-12-03 12:16:06.596  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : In message: testi2
2017-12-03 12:16:06.597  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : In JMSMessageID: ID:DESKTOP-LI5P50P-54036-1512296151920-3:1:1:1:1
2017-12-03 12:16:06.597  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : In JMSCorrelationID: Camel-ID-DESKTOP-LI5P50P-1512296077166-0-2
2017-12-03 12:16:06.597  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : Out message:
2017-12-03 12:16:06.597  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : Out JMSMessageID:
2017-12-03 12:16:06.597  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : Out JMSCorrelationID:
2017-12-03 12:16:06.598  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : After transform:
2017-12-03 12:16:06.598  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : In message: testi2
2017-12-03 12:16:06.598  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : In JMSMessageID: ID:DESKTOP-LI5P50P-54036-1512296151920-3:1:1:1:1
2017-12-03 12:16:06.598  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : In JMSCorrelationID: Camel-ID-DESKTOP-LI5P50P-1512296077166-0-2
2017-12-03 12:16:06.598  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : Out message:
2017-12-03 12:16:06.598  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : Out JMSMessageID:
2017-12-03 12:16:06.598  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : Out JMSCorrelationID:
2017-12-03 12:16:06.598  INFO 13780 --- [Manager[result]] cxfToJMSRoute                            : ...

--

2017-12-03 12:16:06.528  INFO 16972 --- [msConsumer[api]] apiToServiceARoute                       : ...
2017-12-03 12:16:06.531  INFO 16972 --- [msConsumer[api]] apiToServiceARoute                       : In message: testi2
2017-12-03 12:16:06.534  INFO 16972 --- [msConsumer[api]] apiToServiceARoute                       : In JMSMessageID: ID:DESKTOP-LI5P50P-54046-1512296166309-1:1:2:1:1
2017-12-03 12:16:06.534  INFO 16972 --- [msConsumer[api]] apiToServiceARoute                       : In JMSCorrelationID: Camel-ID-DESKTOP-LI5P50P-1512296077166-0-2
2017-12-03 12:16:06.534  INFO 16972 --- [msConsumer[api]] apiToServiceARoute                       : Out message:
2017-12-03 12:16:06.535  INFO 16972 --- [msConsumer[api]] apiToServiceARoute                       : Out JMSMessageID:
2017-12-03 12:16:06.535  INFO 16972 --- [msConsumer[api]] apiToServiceARoute                       : Out JMSCorrelationID:
2017-12-03 12:16:06.535  INFO 16972 --- [msConsumer[api]] apiToServiceARoute                       : Mocking service B by sending InOnly message to reply queue
2017-12-03 12:16:06.558  INFO 16972 --- [msConsumer[api]] apiToServiceARoute                       : ...
2017-12-03 12:16:06.558  INFO 16972 --- [msConsumer[api]] apiToServiceARoute                       : After transform:
2017-12-03 12:16:06.558  INFO 16972 --- [msConsumer[api]] apiToServiceARoute                       : In message: Delegated
2017-12-03 12:16:06.558  INFO 16972 --- [msConsumer[api]] apiToServiceARoute                       : In JMSMessageID: ID:DESKTOP-LI5P50P-54046-1512296166309-1:1:2:1:1
2017-12-03 12:16:06.559  INFO 16972 --- [msConsumer[api]] apiToServiceARoute                       : In JMSCorrelationID: Camel-ID-DESKTOP-LI5P50P-1512296077166-0-2
2017-12-03 12:16:06.559  INFO 16972 --- [msConsumer[api]] apiToServiceARoute                       : Out message:
2017-12-03 12:16:06.559  INFO 16972 --- [msConsumer[api]] apiToServiceARoute                       : Out JMSMessageID:
2017-12-03 12:16:06.559  INFO 16972 --- [msConsumer[api]] apiToServiceARoute                       : Out JMSCorrelationID:
2017-12-03 12:16:06.559  INFO 16972 --- [msConsumer[api]] apiToServiceARoute                       : ...

Solution

    1. OUT message is always empty. Does Camel always use IN by default, until OUT is created by some processor explicitly?

    Yes. Cloning of whole org.apache.camel.Message is really expensive. OUT is created on demand, during first call of Exchange#getOut

    See Apache Camel: Message exchange patterns and the Exchange object


    1. JMSMessageID and JMSCorrelationID doesn't seem to have any connection. Does Camel only use JMSCorrelationID for correlating messages?

    Not exactly. camel-jms (and camel-activemq, which is extending it) component copies header JMSCorrelationID to header CamelCorrelationId.

    Correlation in pollEnrich is again working with CamelCorrelationId and camel-jms translates this header as message selector JMSCorrelationID=CamelCorrelationId.

    So Yes, while communicating between camel-jms (and inherited components) endpoints and exchangePattern=InOnly, correlation will be done effectively on header JMSCorrelationID.

    See Apache Camel: Correlation Identifier


    1. Camel manages to retain the JMSCorrelationID when sending InOnly message to another queue (reply) where the request originates from. This is desirable, but how does it actually do it? What are the specific rules of retaining the JMSCorrelationID?

    Camel does no transformation of JMSCorrelationID when using exchangePattern=InOnly.

    On input route, Camel gets correlation id from input javax.jms.Message and saves it to org.apache.camel.Message#headers.JMSCorrelationId.

    On output route Camel gets correlation id from org.apache.camel.Message#headers.JMSCorrelationId and sets it to javax.jms.Message.setJMSCorrelationID().

    Note: I am not developer of Apache Camel. Every word I have written in this answer is based on user experience while using this framework. There could be minor discrepancies in answer.