Search code examples
javaibm-mq

Trying to setup MQ with a GET message off the queue (not JMS)


I am using the low level direct IBM MQ library to push messages on the queue and retrieve them. I was trying to setup the application such that messages can come in, say by pulling data of a database and then pushing records onto the queue and actually same code may read messages. I was mostly wanted to setup a thread that would pull messages once they come up.

This code runs, the first PUT works but the second one does not work and hangs. Am I not understanding the flow here

Also, if I take the code from the second below around the "GET", could I write a thread that calls that routine every 500 milliseconds, waiting for new messages to come in.

        final int putOptions = MQC.MQPMO_NO_SYNCPOINT
                    | MQC.MQPMO_SYNC_RESPONSE;
            this.mqPMO = new MQPutMessageOptions();
            this.mqPMO.options = putOptions;
            // This code hangs !!!! (error here)
            mqueue.put(msg, this.mqPMO);

...

public void bootstap() {
        MQEnvironment.hostname = "localhost";
        MQEnvironment.port = 1414;
        MQEnvironment.channel = "DEV.ADMIN.SVRCONN";
        MQEnvironment.properties.put(MQConstants.APPNAME_PROPERTY, "my_application_name");
        MQEnvironment.enableTracing(5);

        MQQueueManager mqManager = null;
        MQQueue mqueue = null;
        try {
            // MQCNO_CLIENT_BINDING is not available for Java or .NET as they have their own mechanisms for choosing the bind type.
            final String qmName = "QM1";
            final String userId = "admin";
            final String Password = "passw0rd";
            final Hashtable h = new Hashtable();

            h.put(MQConstants.USER_ID_PROPERTY, userId);
            h.put(MQConstants.PASSWORD_PROPERTY, Password);
            h.put(MQConstants.USE_MQCSP_AUTHENTICATION_PROPERTY, true);
            mqManager = new MQQueueManager(qmName, h);
            //mqManager = new MQQueueManager(qmName, WMQConstants.WMQ_CM_BINDINGS);
            
            this.mqGMO = new MQGetMessageOptions();
            this.mqGMO.options = MQC.MQGMO_NO_SYNCPOINT |
                    MQC.MQGMO_WAIT |
                    MQC.MQGMO_CONVERT |
                    MQC.MQGMO_FAIL_IF_QUIESCING;
            this.mqGMO.matchOptions = MQC.MQMO_MATCH_CORREL_ID;
            this.mqGMO.waitInterval = MQC.MQWI_UNLIMITED;

            int openOptions =  MQC.MQOO_INPUT_SHARED |
                    MQC.MQOO_OUTPUT;
            mqueue = mqManager.accessQueue("DEV.QUEUE.1", openOptions);
            logger.info(">> Find connection handle queue manager - " + mqueue);

            {
                final MQMessage msg = new MQMessage();
                final String correlId = "0002";
                final String byteArry = this.hexStringToByteArray(correlId);

                logger.info(">>> correlId: " + correlId);
                logger.info(">>> byteArry: " + byteArry);

                msg.correlationId = byteArry.getBytes();
                msg.format = MQConstants.MQFMT_STRING;
                // ... and write some text in UTF8 format
                msg.writeUTF("{{ Hello, World }}}");

                // Use the default put message options...
                // Or: pmo.options = MQConstants.MQPMO_ASYNC_RESPONSE
                final int putOptions = MQC.MQPMO_NO_SYNCPOINT
                        | MQC.MQPMO_SYNC_RESPONSE;
                this.mqPMO = new MQPutMessageOptions();
                this.mqPMO.options = putOptions;

                // put the message //
                mqueue.put(msg, this.mqPMO);
                logger.info(" >>> Continue to get routine");
            }

            {
                // This code works !!! get the message
                MQMessage retrievedMessage = new MQMessage();
                retrievedMessage.correlationId = this.hexStringToByteArray("0001").getBytes();
                mqueue.get(retrievedMessage, this.mqGMO);

                // And prove we have the message by displaying the UTF message text
                String msgText = retrievedMessage.readUTF();
                logger.info("~~~~ The message is: " + msgText);
            }

            {
                final MQMessage msg = new MQMessage();
                final String correlId = "0001";
                final String byteArry = this.hexStringToByteArray(correlId);

                logger.info(">>> correlId: " + correlId);
                logger.info(">>> byteArry: " + byteArry);

                msg.correlationId = byteArry.getBytes();
                msg.format = MQConstants.MQFMT_STRING;
                // ... and write some text in UTF8 format
                msg.writeUTF("{{ Hello, World }}}");

                // Use the default put message options...
                // Or: pmo.options = MQConstants.MQPMO_ASYNC_RESPONSE
                final int putOptions = MQC.MQPMO_NO_SYNCPOINT
                        | MQC.MQPMO_SYNC_RESPONSE;
                this.mqPMO = new MQPutMessageOptions();
                this.mqPMO.options = putOptions;
                // This code hangs !!!! (error here)
                mqueue.put(msg, this.mqPMO);
            }


            mqueue.close();
            mqManager.disconnect();
        } catch(final Exception e) {
            logger.error("Error at MQ manager", e);
        }
    }

Solution

  • First off, do NOT use MQEnvironment class as it is not thread safe. You should be using a Hashtable for MQ connection information.

    Secondly, what is with all of the final declarations? Very odd.

    Your code doesn't make any sense. Here is what I see your code doing:

    • Set MQ connection information uses the MQEnvironment class

    • Set UserId and Password in a Hashtable

    • Connect to the queue manager

    • Open the queue

    • Set 'correlId' to "0002"

      final String byteArry = this.hexStringToByteArray(correlId);

    This line of code does not make any sense. The method name doesn't match your code. A Hex string should be in the format of "30303032" for "0002" and return a byte array i.e. byte[] but it is returning a String. So, I have no idea what the hexStringToByteArray method is doing.

    Also, the MsgId, CorrelId and GroupId fields of the MQMD structure are 24 bytes in length.

    • Write the message data in a UTF format. Why? Does the receiving application require UTF format?
    • Put the message on the queue.
    • Set the CorrelId for the receive message to "0001" but converted to a strange representation by hexStringToByteArray method.
    • Get a message from the queue with a wait interval of unlimited. Since there is no message on the queue that will match that CorrelId then MQ client library will wait forever!!!!!
    • Create another MQMessage and set the CorrelId to "0001" but converted to a strange representation by hexStringToByteArray method.
    • Write the message data in a UTF format. Why? Does the receiving application require UTF format?
    • Put the message on the queue.
    • Close the queue
    • Disconnect from the queue manager

    Here is a fully functioning Java/MQ program that will put 2 messages on a queue with unique CorrelIds (i.e. "0001" & "0002") and then retrieve the message with a CorrelId of "0002".

    import java.io.IOException;
    import java.text.DecimalFormat;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Hashtable;
    
    import com.ibm.mq.MQException;
    import com.ibm.mq.MQGetMessageOptions;
    import com.ibm.mq.MQMessage;
    import com.ibm.mq.MQPutMessageOptions;
    import com.ibm.mq.MQQueue;
    import com.ibm.mq.MQQueueManager;
    import com.ibm.mq.constants.CMQC;
    
    /**
     * Program Name
     *  MQTest11B
     *
     * Description
     *  This java class will connect to a remote queue manager with the
     *  MQ setting stored in a HashTable, put 2 message on a queue with unique CorrelIds
     *  and then retrieve the message with a  CorrelId of "0002".
     *
     * Sample Command Line Parameters
     *  -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password
     *
     * @author Roger Lacroix
     */
    public class MQTest11B
    {
       private static final SimpleDateFormat  LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
    
       private Hashtable<String,String> params;
       private Hashtable<String,Object> mqht;
       private String qMgrName;
       private String outputQName;
    
       /**
        * The constructor
        */
       public MQTest11B()
       {
          super();
          params = new Hashtable<String,String>();
          mqht = new Hashtable<String,Object>();
       }
    
       /**
        * Make sure the required parameters are present.
        * @return true/false
        */
       private boolean allParamsPresent()
       {
          boolean b = params.containsKey("-h") && params.containsKey("-p") &&
                      params.containsKey("-c") && params.containsKey("-m") &&
                      params.containsKey("-q") &&
                      params.containsKey("-u") && params.containsKey("-x");
          if (b)
          {
             try
             {
                Integer.parseInt((String) params.get("-p"));
             }
             catch (NumberFormatException e)
             {
                b = false;
             }
          }
    
          return b;
       }
    
       /**
        * Extract the command-line parameters and initialize the MQ HashTable.
        * @param args
        * @throws IllegalArgumentException
        */
       private void init(String[] args) throws IllegalArgumentException
       {
          int port = 1414;
          if (args.length > 0 && (args.length % 2) == 0)
          {
             for (int i = 0; i < args.length; i += 2)
             {
                params.put(args[i], args[i + 1]);
             }
          }
          else
          {
             throw new IllegalArgumentException();
          }
    
          if (allParamsPresent())
          {
             qMgrName = (String) params.get("-m");
             outputQName = (String) params.get("-q");
    
             try
             {
                port = Integer.parseInt((String) params.get("-p"));
             }
             catch (NumberFormatException e)
             {
                port = 1414;
             }
             
             mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c"));
             mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h"));
             mqht.put(CMQC.PORT_PROPERTY, new Integer(port));
             mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u"));
             mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x"));
    
             // I don't want to see MQ exceptions at the console.
             MQException.log = null;
          }
          else
          {
             throw new IllegalArgumentException();
          }
       }
    
       /**
        * Connect, open queue, write a message, close queue and disconnect.
        *
        */
       private void testSendAndReceive()
       {
          MQQueueManager qMgr = null;
          MQQueue queue = null;
          int openOptions = CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_OUTPUT | CMQC.MQOO_FAIL_IF_QUIESCING;
          MQPutMessageOptions pmo = new MQPutMessageOptions();
          pmo.options = CMQC.MQPMO_NO_SYNCPOINT | CMQC.MQPMO_FAIL_IF_QUIESCING;
          MQGetMessageOptions gmo = new MQGetMessageOptions();
          gmo.options = CMQC.MQGMO_NO_SYNCPOINT | CMQC.MQGMO_WAIT | CMQC.MQGMO_CONVERT | CMQC.MQGMO_FAIL_IF_QUIESCING;
          gmo.matchOptions = CMQC.MQMO_MATCH_CORREL_ID;
          gmo.waitInterval = CMQC.MQWI_UNLIMITED;
          MQMessage sendmsg;
          String msgData;
          DecimalFormat df = new DecimalFormat("0000");
    
          try
          {
             qMgr = new MQQueueManager(qMgrName, mqht);
             logger("successfully connected to "+ qMgrName);
    
             queue = qMgr.accessQueue(outputQName, openOptions);
             logger("successfully opened "+ outputQName);
             
             /*
              * Code to send 2 messages with a specific CorrelId.  i.e. 0001 and 0002
              */
             for (int i=0; i < 2; i++)
             {
                // Define a simple MQ message, and write some text
                sendmsg = new MQMessage();
                sendmsg.format = CMQC.MQFMT_STRING;
                sendmsg.messageId = CMQC.MQMI_NONE;
                sendmsg.correlationId = df.format(i+1).getBytes();
    
                // Write message data
                msgData = "This is a test message from MQTest11B. CorrelID is "+new String(sendmsg.correlationId);
                sendmsg.writeString(msgData);
    
                // put the message on the queue
                queue.put(sendmsg, pmo);
                logger("Sent: Message Data>>>" + msgData);
             }
             
             /*
              * Code to receive a message with a specific CorrelId.  i.e. 0002
              */
             
             // Define a simple MQ message, and write some text
             MQMessage receiveMsg = new MQMessage();
             receiveMsg.messageId = CMQC.MQMI_NONE;
             receiveMsg.correlationId = "0002".getBytes();
    
             // get the message on the queue
             queue.get(receiveMsg, gmo);
    
             if (CMQC.MQFMT_STRING.equals(receiveMsg.format))
             {
                String msgStr = receiveMsg.readStringOfByteLength(receiveMsg.getMessageLength());
                logger("Received: Message Data>>>" + msgStr);
             }
             else
             {
                byte[] b = new byte[receiveMsg.getMessageLength()];
                receiveMsg.readFully(b);
                logger("Received: Message Data>>>" + new String(b));
             }
          }
          catch (MQException e)
          {
             logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
          }
          catch (IOException e)
          {
             logger("IOException:" +e.getLocalizedMessage());
          }
          finally
          {
             try
             {
                if (queue != null)
                {
                   queue.close();
                   logger("closed: "+ outputQName);
                }
             }
             catch (MQException e)
             {
                logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
             }
             try
             {
                if (qMgr != null)
                {
                   qMgr.disconnect();
                   logger("disconnected from "+ qMgrName);
                }
             }
             catch (MQException e)
             {
                logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
             }
          }
       }
    
       /**
        * A simple logger method
        * @param data
        */
       public static void logger(String data)
       {
          String className = Thread.currentThread().getStackTrace()[2].getClassName();
    
          // Remove the package info.
          if ( (className != null) && (className.lastIndexOf('.') != -1) )
             className = className.substring(className.lastIndexOf('.')+1);
    
          System.out.println(LOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);
       }
    
       /**
        * main line
        * @param args
        */
       public static void main(String[] args)
       {
          MQTest11B write = new MQTest11B();
    
          try
          {
             write.init(args);
             write.testSendAndReceive();
          }
          catch (IllegalArgumentException e)
          {
             logger("Usage: java MQTest11B -m QueueManagerName -h host -p port -c channel -q QueueName -u UserID -x Password");
             System.exit(1);
          }
    
          System.exit(0);
       }
    }
    

    And the output should look like this:

    2021/07/02 14:01:59.316 MQTest11B: testSendAndReceive: successfully connected to MQA1
    2021/07/02 14:01:59.332 MQTest11B: testSendAndReceive: successfully opened TEST.Q1
    2021/07/02 14:01:59.332 MQTest11B: testSendAndReceive: Sent: Message Data>>>This is a test message from MQTest11B. CorrelID is 0001
    2021/07/02 14:01:59.347 MQTest11B: testSendAndReceive: Sent: Message Data>>>This is a test message from MQTest11B. CorrelID is 0002
    2021/07/02 14:01:59.347 MQTest11B: testSendAndReceive: Received: Message Data>>>This is a test message from MQTest11B. CorrelID is 0002
    2021/07/02 14:01:59.347 MQTest11B: testSendAndReceive: closed: TEST.Q1
    2021/07/02 14:01:59.347 MQTest11B: testSendAndReceive: disconnected from MQA1