Search code examples
jmsspring-integrationibm-mq

Retaining message id between 2 MQ brokers using spring integration


I am using a message worfklow made of of message-driven-channel-adapter => channel => outbound-channel-adapter. Its purpose is to transport messages from a MqSeries broker to another MQSeries broker. It's transactionnal (ack required)

The relevant part of it is below (some parts are obvisously missing. If you think they are required, I will edit my post and add them).

My problem is about message headers, and specifically msgId. When I put a message with a messageId in the inbound queue, I expect it to remain the same through the whole pipeline.
But instead the messageId is transformed in the outbound queue, with its content being replaced by a generated ID including outbound queue mananager name.

From the emitter (it's only an exemple for a possible emitting code. I have the same problem from every code I used, as long as I provide a msgId):

com.ibm.mq.MQMessage message = new MQMessage(); 
message.messageId=("TEST MessageId 1234").getBytes();

And from MQExplorer :

  • From Inbound Queue : MessageId = TEST MessageId 1234
  • From Outbound Queue : MessageId = AMQ <QM_NAME> <some random(?) code>

There may be a obvious (but not for me) reason, but I don't get it now. I read (well?) that the message Id can be generated by the QM from specific scenarii, or specific commands. But I don't see how it does apply in spring integration.

Any one has an idea on how Spring Integration handles messageId and how I can retain the same through my whole pipeline?

<beans>
        
    <int:channel id="channelMQ_MQ" ></int:channel>

    <!-- Source : MQseries -->
    <!- ... -->
    <bean id="jmsQueue" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
        ...       
    </bean>
    <!- ... -->
    <bean id="myListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer" >
        <property name="autoStartup" value="false" />
        <property name="connectionFactory" ref="connectionFactoryCaching" />
        <property name="destination" ref="jmsQueue" />
        <!- ... -->         
        <property name="sessionTransacted" value="true"/>
    </bean>
    
    <int-jms:message-driven-channel-adapter 
        id="jmsIn" 
        container="myListener" 
        channel="channelMQ_MQ" 
        error-channel="processChannel1"/>
                                    

    <!-- Destination MQ_SERIES      -->
        <!- ... -->
    <bean id="jmsQueue2" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
        ...
    </bean>
    
    <int-jms:outbound-channel-adapter   channel="channelMQ_MQ" 
                                        id="jmsOut2" 
                                        destination="jmsQueue2" 
                                        connection-factory="connectionFactoryCaching2" 
                                        delivery-persistent="true" 
                                        explicit-qos-enabled="true" 
                                        session-transacted="true" >
    </int-jms:outbound-channel-adapter>

                                        

</beans>

Edit 1:

Following @artem-bilan advice, I set up a header-enricher. But atm, this is not working at all... None of the properties are set up.

    <int:channel id="channel_tmp">
    </int:channel>
    
    
    <int:header-enricher input-channel="channelMQ_MQ"  output-channel="channel_tmp"  id="headerEnricher1">
        <int:header name="MSI" expression="headers.jms_messageId"/>
        <int:header name="JMS_IBM_MQMD_MsgId" expression="headers.jms_messageId"/>
        <int:header name="MSGID" expression="headers.jms_messageId"/>
        <int:header name="MsgId" expression="headers.jms_messageId"/>
        <int:header name="CorrelId" expression="headers.jms_messageId"/>
        <int:header name="GroupId" expression="headers.jms_messageId"/>
        <int:header name="MsggSeqNumber" expression="headers.jms_messageId"/>
        <int:header name="offset" expression="headers.jms_messageId"/>
    </int:header-enricher>
    
    <int-jms:outbound-channel-adapter   channel="channel_tmp" 
                                        id="jmsOut2" 
                                        destination="jmsQueue2" 
                                        connection-factory="connectionFactoryCaching2" 
                                        delivery-persistent="true" 
                                        explicit-qos-enabled="true" 
                                        session-transacted="true" >
    </int-jms:outbound-channel-adapter>

Edit 2 : after some research, we found an IBM doc stating that "To be able to set the Message ID, the JMS destination queue needs to have the property 'MQMD WRITE ENABLE" set to ENABLED. This property allows a JMS application to set the value of the MQMD fields." So we tried to set this property from our JmsQueue :

    <bean id="jmsQueue2" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
        ...
      <property name="MQMDWriteEnabled" value="true"></property>
      <property name="MQMDMessageContext" value="2"></property>
    </bean>

Unfortunately, although it was promising, this did not work for the messageId (but other MQMD fields work).

Edit3 :

Following Artem Bilan advice on debugging JmsHeaderMapper, sounds like we found out that byte array is not supported by the header mapper (spring integration version : 5.3.2.RELEASE), but expected by IBM... which leads to header being basically skipped. Thus, this will not work that way:

    <int:header name="JMS_IBM_MQMD_MsgId" expression="headers['jms_messageId'].bytes"/>

Edit 4:

After noticing that current version of spring-integration-jms not accepting "byte[]" type (which is IBM MSGID type), we added a custom header mapper. It worked but we had to retrieved (hex to byte) it from already-mapped message (looking like "ID:3214F1044...") and passing it as a byte array into the header as a "JMS_IBM_MQMD_MsgId" property. And this was a dubious solution because of the triple conversion (MQ [BYTE24] => JMS [ID:String] => Java [Byte[]] => MQ[BYTE24] )
Eventually, we found out that the inbound queue, as well as the outbound one, can be configured such as they will pass all context (jms-mapped headers as well as raw MQ ones). Thus, we don't have to do complex mapping.... only basic mapping (since byte[] are still not mapped in defaultHeaderMapper).

So the final solution is:

<bean id="jmsQueueIN" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
  ...
  <property name="MQMDMessageContext" value="2"></property>
  <property name="MQMDReadEnabled" value="true"></property>
</bean>

<bean id="jmsQueueOut" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
    ...
    <property name="MQMDWriteEnabled" value="true"></property>
    <property name="MQMDMessageContext" value="2"></property>       
</bean>


<bean id="mqCompatibleJmsHeaderMapper" class="com.my.company.mappers.MqCompatibleJmsHeaderMapper"/> 
<int-jms:outbound-channel-adapter   channel="channel_MQ_MQ" 
                                    id="jmsOut" 
                                    destination="jmsQueueOUT" 
                                    ... 
                                    header-mapper="mqCompatibleJmsHeaderMapper">
...                                     
</int-jms:outbound-channel-adapter>

_

public class MqCompatibleJmsHeaderMapper extends DefaultJmsHeaderMapper {
...
  public void fromHeaders(MessageHeaders headers, Message jmsMessage) {
    Object messageId = headers.get(WMQConstants.JMS_IBM_MQMD_MSGID);
    if(messageId !=null) {
        if (messageId instanceof byte[]) {
            jmsMessage.setObjectProperty(WMQConstants.JMS_IBM_MQMD_MSGID, messageId);
        }else  {
         ...
        }
    }
    super.fromHeaders(headers, jmsMessage);
  }

...
}

Solution

  • This answer computes the various (and very usefull!) comments and answers that helped solve our problem.

    After noticing that current version of spring-integration-jms does not accept "byte[]" type (which is IBM MSGID type), we added a custom header mapper.

    The original RAW msgId was not available from the inbound : instead, we received JMS-mapped heades such as :

    JMSXAppID=com.my.company.test.MqProducer
    jms_replyTo=queue://QM2/QUEUE.OUT.MOBA?targetClient=1 jms_correlationId=ID:4142434423313233343536373839305f3100000000000000 jms_messageId=ID:4142434423313233343536373839305f3100000000000000

    They were received this way from the listner attached with the inbound adapter.
    Thus, the first draft used a headerMapper to transform jmsMessageId (Id:...) into IBM Mq BYTE24 comptabile (thus : byte[] into "JMS_IBM_MQMD_MsgId" property)

    But this was a hazardous solution because of the triple conversion (MQ [BYTE24] => JMS [ID:String] => Java [Byte[]] => MQ[BYTE24] )

    Eventually, we found out that the inbound queue, as well as the outbound one, can be configured such as they will pass all context (jms-mapped headers as well as raw MQMD ones) : using MQMDMessageContext = CMQC.MQPMO_SET_ALL_CONTEXT (2) and MQMDRead/WriteEnabled = true (according to whether it is the inbound or outbound queue). This way, all required fields were available from start :

    JMS_IBM_MQMD_PutApplName=CustomOwnApplName
    JMSXAppID=com.my.company.test.MqProducer
    JMS_IBM_MQMD_ReplyToQ=
    jms_replyTo=queue://QM2/QUEUE.OUT.MOBA?targetClient=1
    JMS_IBM_MQMD_CorrelId=[B@47e0d39f
    jms_correlationId=ID:4142434423313233343536373839305f3100000000000000
    JMS_IBM_MQMD_MsgId=[B@399141ee
    jms_messageId=ID:4142434423313233343536373839305f3100000000000000

    Thus, we don't have to do dubious mapping.... only basic mapping (since byte[] are still not mapped in defaultHeaderMapper).

    So the final solution is:

    <bean id="jmsQueueIN" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
      ...
      <property name="MQMDMessageContext" value="2"></property>
      <property name="MQMDReadEnabled" value="true"></property>
    </bean>
    
    <bean id="jmsQueueOut" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
        ...
        <property name="MQMDWriteEnabled" value="true"></property>
        <property name="MQMDMessageContext" value="2"></property>       
    </bean>
    
    
    <bean id="mqCompatibleJmsHeaderMapper" class="com.my.company.mappers.MqCompatibleJmsHeaderMapper"/> 
    <int-jms:outbound-channel-adapter   channel="channel_MQ_MQ" 
                                        id="jmsOut" 
                                        destination="jmsQueueOUT" 
                                        ... 
                                        header-mapper="mqCompatibleJmsHeaderMapper">
    ...                                     
    </int-jms:outbound-channel-adapter>
    

    _

    public class MqCompatibleJmsHeaderMapper extends DefaultJmsHeaderMapper {
    ...
      public void fromHeaders(MessageHeaders headers, Message jmsMessage) {
        Object messageId = headers.get(WMQConstants.JMS_IBM_MQMD_MSGID);
        if(messageId !=null) {
            if (messageId instanceof byte[]) {
                jmsMessage.setObjectProperty(WMQConstants.JMS_IBM_MQMD_MSGID, messageId);
            }else  {
             ...
            }
        }
        super.fromHeaders(headers, jmsMessage);
      }
    
    ...
    }