Search code examples
node.jsibm-mq

IBM MQ node js library 'ibmmq' v2 get messages using SYNCPOINT


This question is specific to the new v2 version of 'ibmmq' NodeJS package. There are some breaking changes in this version. So, answers for the older v1 version will not help!

I am trying to use this package to get messages using MQGMO_SYNCPOINT flag in the gmo (get message options) object.

e.g.

// see https://www.ibm.com/docs/en/ibm-mq/9.2?topic=mqgmo-options-mqlong
gmo.Options =
  MQC.MQGMO_SYNCPOINT |
  MQC.MQGMO_WAIT |
  MQC.MQGMO_CONVERT |
  MQC.MQGMO_FAIL_IF_QUIESCING;
gmo.MatchOptions = MQC.MQMO_NONE;
gmo.WaitInterval = this.waitInterval * 1000; // 60 seconds during testing
mq.Get(this.queueHandle, new mq.MQMD(), gmo, this.getMessagesCB);

The MQGMO_SYNCPOINT option ensures that the message is not removed from the queue until I later commit the message using mq.Cmit call.

However, when I make that call in my callback function this.getMessagesCB e.g.

mq.Cmit(this.connectionHandle, function (err) {
  if (err) {
    console.log("Error on commit", err);
  }
});

I get the following error:

{
  "name": "MQError",
  "message": "CMIT: MQCC = MQCC_FAILED [2] MQRC = MQRC_HCONN_ASYNC_ACTIVE [2500]",
  "mqcc": 2,
  "mqccstr": "MQCC_FAILED",
  "mqrc": 2500,
  "mqrcstr": "MQRC_HCONN_ASYNC_ACTIVE",
  "version": "2.0.1",
  "verb": "CMIT"
}

With this new version of 'ibmmq' NodeJS package, we have to call the Ctl method with one of the start options. I am using MQOP_START as shown in the samples included in the v2 version. e.g.

mq.Ctl(this.connectionHandle, mq.MQC.MQOP_START, function (error) {
  blah blah
})

Reading the control callbacks documentation says:

MQOP_START Start the consuming of messages for all defined message consumer functions for the specified connection handle.

Callbacks run on a thread started by the system, which is different from any of the application threads.

This operation gives control of the provided connection handle to system. The only MQI calls which can be issued by a thread other than the consumer thread are:

  • MQCTL with Operation MQOP_STOP
  • MQCTL with Operation MQOP_SUSPEND
  • MQDISC - Performs MQCTL with Operation MQOP_STOP before disconnection the HConn.

MQRC_HCONN_ASYNC_ACTIVE is returned if an IBM® MQ API call is issued while the connection handle is started, and the call does not originate from a message consumer function.

So, I tried, issuing a new Ctl call with MQOP_STOP, before calling mq.Cmit e.g.

mq.Ctl(this.connectionHandle, mq.MQC.MQOP_STOP, function (error) {
  if (error) {
    logError({ msg: `mq.CTL STOP error: ${formatErr(error)}`, error });
  } else {
    logInfo(`initiated MQOP_STOP`);
    mq.Cmit(this.connectionHandle, this.callbackOnCommit);
  }
});

Now I get a different error:

libc++abi: terminating due to uncaught exception of type Napi::Error

I also tried replacing MQOP_STOP call with MQOP_SUSPEND and get the same error as with MQOP_STOP

Help!


Solution

  • For future readers, this issue was raised on GitHub in the IBM MQ Messaging repository, where Mark Taylor (author of the IBM MQ nodejs MQI library) provided advice. The issue can be found here: https://github.com/ibm-messaging/mq-mqi-nodejs/issues/169

    To help ensure the above link does not become dead at any point in the future, I have also had the wayback machine from the internet archive index the page, which is now available here: https://web.archive.org/web/20231014005837/https://github.com/ibm-messaging/mq-mqi-nodejs/issues/169

    For a quick TL:DR, quoting Mark from the issue:

    The fundamental reason for getting the MQRC_HCONN_ASYNC_ACTIVE is that the NodeJS callback function is not running on the same thread as the real C-driven callback function. So you can't call MQI functions directly from that NodeJS callback.

    It was also identified by AnVad that the Ctl and Cmit calls in the nodejs mqi library are synchronous rather than asynchronous as would be typically expected for a JS library. To avoid getting the error mentioned above (MQRC_HCONN_ASYNC_ACTIVE), the following needs to happen:

    1. Call Ctl with MQOP_STOP specified.
    2. Call some asynchronous code and await the result
    3. Call Ctl with MQOP_STOP specified. (Again)
    4. Call Cmit.
    5. Call Ctl with MQOP_START specified.

    Steps 3-5 need to be executed in a single synchronous block, otherwise the aforementioned error will get thrown.

    A full code example can be seen here: https://github.com/ibm-messaging/mq-mqi-nodejs/issues/169#issuecomment-1753355483. The example is also stored within the wayback machine via the link provided earlier in this answer. For easier viewing, here's the code from AnVad's github comment:

    // register the new message callback in the class `init` function
    init() {
      try {
          const cno = new mq.MQCNO();
          cno.Options = mq.MQC.MQCNO_CLIENT_BINDING; // connecting as client
          const cd = new mq.MQCD();
          cd.ConnectionName = mqConfig.connectionName;
          cd.ChannelName = mqConfig.channelName;
          cno.ClientConn = cd;
          
          this.hConn = await mq.ConnxPromise(qMgr, cno);
          const od = new mq.MQOD();
          od.ObjectName = qName;
          od.ObjectType = mq.MQC.MQOT_Q;
        
          let openOptions = mq.MQC.MQOO_INPUT_SHARED | mq.MQC.MQGMO_SYNCPOINT;
          this.queueHandle = await mq.OpenPromise(this.hConn, od, openOptions);
        
          let md = new mq.MQMD(); // message descriptor?
          const gmo = new mq.MQGMO(); // get message options
          const MQC = mq.MQC;
          gmo.Options = MQC.MQGMO_SYNCPOINT | MQC.MQGMO_WAIT | MQC.MQGMO_CONVERT | MQC.MQGMO_FAIL_IF_QUIESCING;
          gmo.WaitInterval = MQC.MQWI_UNLIMITED; // wait forever
          gmo.MatchOptions = MQC.MQMO_NONE;
    
          this.md = md;
          this.gmo = gmo;
        
          mq.Get(this.queueHandle, md, gmo, this.getMessagesCB);
      } catch(error) {
        console.error('Error in registering MQ callback', error);
      }
    }
    
    // getMessagesCB is a class method that was set as the callback in a call to `mq.Get()`
    getMessagesCB = async (error, hObj, gmo, md, buf, hConn) => {
      if (error) {
        console.error('error in getMessages callback', error);
        return;
      }
      try {
        mq.Ctl(this.hConn, mq.MQC.MQOP_SUSPEND);
      
        // do my async thing with this message
        await writeMessageToDB(buf.toString('utf8'));
    
        // now I need to call suspend again since the await keyword used above puts me in a continuation, else I get a MQRC_HCONN_ASYNC_ACTIVE error
        // so, calling suspend, Cmit and resume in the same synchronous region of code
        mq.Ctl(this.hConn, mq.MQC.MQOP_SUSPEND);
        mq.Cmit(this.hConn);
        mq.Ctl(this.hConn, mq.MQC.MQOP_RESUME);
      } catch(error) {
        console.error('error saving message to DB and committing message in MQ', error);
      }
    }
    

    Special thanks to @AnVad for providing the additional context for this answer, and to Mark Taylor for assisting him over on GitHub to troubleshoot the issue.