Infra-Overview:
I have a setup where I am reading a set of messages from IBM MQ and processing those messages in k8 cluster env and sending it to the destination host.
Issue:
I observed that sometimes the flow of the messages is huge and before sending it to the destination host our pod gets failed and restarts, by this we are losing all the messages as we are following a read-and-delete approach from ibmmq example
Expected Solution:
I am looking for a solution where, until these messages are sent to the destination host, we don't lose the track of the messages.
What I tried:
We have a concept of unit of work
in IBM MQ but since we can't expect a delay in reading and processing, I can't wait for a single message to get processed and then read the another message as it might have a major performance setback.
Code language:
NodeJs
As the comments suggest there are a number of ways to skin this cat, but you will need to use transactions.
As soon as you create the connection with the transaction option, the transaction scope begins. This gets closed and next transaction begins when you either commit or rollback.
So you should handle the messages in batches, that make sense to your application, and commit when the batch is complete. If your application is killed by k8s then all uncommitted read messages will get rolled back, via back out queue process to stop poison messages.
Section added to show sample code, and explanation of backout queues.
In your normal processing, if an app gets stopped before it has had time to process the message, you will want that message returned to the queue. So that the message is still available to be processed.
To enable this rollback you need to
or
in theMQC.MQPMO_SYNCPOINT
into the get message options
gmo.Options |= MQC.MQGMO_SYNCPOINT
Then if all goes well, you can commit.
mq.Cmit(hConn, function(err) {
if (err) {
debug_warn('Error on commit', err);
} else {
debug_info('Commit was successful');
}
});
or rollback
mq.Back(hConn, function(err) {
if (err) {
debug_warn('Error on rollback', err);
} else {
debug_info('rollback was successful');
}
});
If you rollback, the message goes back to the queue. Which means it is also the next message that your app will read. This can generate a poison message loop. So you should also set up a
backout queue
withpass all context
permissions for your app user and abackout threshold
.
Say you set the threshold to 5. The message can be read 5 times, with rollback. Your app needs to check the threshold and decide that it is a poison message and move it off the queue.
To check the backout threshold (and the backout queue name) you can use the following code
// Remember to or in the Inquire option on the Open
openOptions |= MQC.MQOO_INQUIRE;
...
attrs = [ new mq.MQAttr(MQC.MQIA_BACKOUT_THRESHOLD),
new mq.MQAttr(MQC.MQCA_BACKOUT_REQ_Q_NAME) ];
mq.Inq(hObj, attrs, (err, selectors) => {
if (err) {
debug_warn('Error retrieving backout threshold', err);
} else {
debug_info('Attributes have been found');
selectors.forEach((s) => {
switch (s.selector) {
case MQC.MQIA_BACKOUT_THRESHOLD:
debug_info('Threshold is ', s.value);
break;
case MQC.MQCA_BACKOUT_REQ_Q_NAME:
debug_info('Backout queue is ', s.value);
break;
}
});
}
});
When getting the message your app can use mqmd.BackoutCount to check how often the message has been rolled back.
if (mqmd.BackoutCount >= threshold) {
...
}
What I have noticed, that if this is in the same application instance that is repeatedly calling rollback on the same message, then at the threshold a
MQRC_HOBJ_ERROR
error is thrown. Which your app can check for, and then discard the message. If its a different app instance then it doesn't get theMQRC_HOBJ_ERROR
error, so it can check the backout threshold and can discard the message, remembering to commit the discard action.
See https://github.com/ibm-messaging/mq-dev-patterns/tree/master/transactions/JMS/SE for more information.
As an alternative you could use keda - https://keda.sh - which works with k8s to monitor your queue depth and scale according to the number of messages waiting to be processed, as opposed to CPU / memory consumption. That way you can scale up when there are lots of messages waiting to be processed, and slowly scale down then the queue becomes manageable. Here is a link to getting started - https://github.com/ibm-messaging/mq-dev-patterns/tree/master/Go-K8s - the example is for a Go app, but equally applies to Node.js