Search code examples
queuejmswebsphereibm-mqcorrelation

How to match MQ Server reply messages to the correct request


I'm connecting to an IBM Websphere MQ. I want to be able to match the reply message with the correct request message. I've trawled through hundreds of pages to get this and have had no luck.

I have a class - MQHandler - which sends a message to one defined queue, and reads the request from another. This works fine, however, if multiple users are using the application at the same time, messages get mixed up.

I can't seem to get a method on the receiver to indicate the CorrelationID to match. Something like...

consumer.receive( selector );

Can you check the below methods to ensure I'm doing this correctly?

/**
 * When the class is called, this initialisation is done first.
 * 
 * @throws JMSException
 */
public void init() throws JMSException
{
    // Create a connection factory
    JmsFactoryFactory ff;
    try
    {
        ff = JmsFactoryFactory.getInstance( WMQConstants.WMQ_PROVIDER );
        cf = ff.createConnectionFactory();

        // Set the properties
        cf.setStringProperty( WMQConstants.WMQ_HOST_NAME, hostServer );
        cf.setIntProperty( WMQConstants.WMQ_PORT, 1414 );
        cf.setStringProperty( WMQConstants.WMQ_CHANNEL, channel );
        cf.setIntProperty( WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT );
        cf.setStringProperty( WMQConstants.WMQ_QUEUE_MANAGER, qManager );

        connection = cf.createConnection();

        session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE );
    }
    catch( JMSException e )
    {
        throw e;
    }

} // end of init



/**
 * @param request
 * @return
 * @throws JMSException
 */
private String sendRequest( String request ) throws JMSException
{

    // Create JMS objects
    Destination destination = session.createQueue( "queue:///" + writeQueueName );

    // Enable write of MQMD fields. See documentation for further
    // details.
    ((JmsDestination) destination).setBooleanProperty( WMQConstants.WMQ_MQMD_WRITE_ENABLED, true );

    // Set message context, if needed. See comment at the top.

    // Create a producer
    MessageProducer producer = session.createProducer( destination );

    // Create a message
    TextMessage message = session.createTextMessage( request );

    // Generate a custom message id
    message.setJMSCorrelationID( generateRandomID() );

    // Start the connection
    connection.start();

    // And, send the message
    producer.send( message );
    System.out.println(message);

    return message.getJMSCorrelationID();
}


/**
 * @param customMessageId
 * @return
 * @throws JMSException
 */
private String recvResponse( String customMessageId ) throws JMSException
{
    Destination destination = session.createQueue( "queue:///" + readQueueName );

    // Enable read of MQMD fields.
    ((JmsDestination) destination).setBooleanProperty( WMQConstants.WMQ_MQMD_READ_ENABLED, true );
    ((JmsDestination) destination).setObjectProperty( WMQConstants.JMS_IBM_MQMD_CORRELID, customMessageId );

    // Create a consumer
    MessageConsumer consumer = session.createConsumer( destination );

    // Start the connection
    connection.start();

    // And, receive a message from the queue
    TextMessage receivedMessage = (TextMessage)consumer.receive( 15000 );

    connection.close();
    session.close();

    return receivedMessage.getText();
}

Here is a snippet of the main method...

    try
    {
        String customMessageId;
        init();
        customMessageId = sendRequest( request );
        return recvResponse( customMessageId );
    }
    catch( Exception ex )
    {
        System.out.println( "Error on MQ." );
        throw new Exception( "\n\n*** An error occurred ***\n\n" + ex.getLocalizedMessage()
                             + "\n\n**********************************" );
    }

Solution

  • QueueReceiver queueReceiver = 
        session.createReceiver(destination, "JMSCorrelationID='customMessageId'");
    
    TextMessage receivedMessage = (TextMessage)queueReceiver.receive( 15000 );
    

    In my example, customMessageId should contain the actual value you have previously set.

    Also, I have seen many cases where people generate a correlationID and set it in the outbound message expecting to be able to select the response based on that value. The textbook way to do this is for the service provider app to copy the message ID to the correlation ID when responding. The requestor would still specify the JMSCorrelationID as the selector but would use the original JMSMessageID as the value. Since the JMSMessageID is guaranteed to be unique even across QMgrs, you are MUCH less likely to get collisions on this value. You will need to insure that your client matches the behavior of the service provider with respect to which value gets copied into the correlation ID.