As I may handle message for long time which exceeded maximum locking time. I would like to know the syntax in NodeJS to renew message lock.
let { ServiceBusClient, ReceiveMode } = require('@azure/service-bus'),
serviceBusClient = ServiceBusClient.createFromConnectionString(SERVICE_BUS_SECRET),
queueClient = serviceBusClient.createQueueClient(XXX_QUEUE),
receiver = queueClient.createReceiver(ReceiveMode.peekLock),
errorHandler = function() {
...
},
messageHandler = function(message) {
// want to renew message lock here
}
receiver.registerMessageHandler(messageHandler, errorHandler);
When we use the method registerMessageHandler
, we can provide the option maxMessageAutoRenewLockDurationInSeconds
. It means that the maximum duration in seconds until which the lock on the message will be renewed by the SDK automatically. But please note that this auto-renewal stops once the message is settled or once the user-provided onMessage handler completes its execution. For more details, please refer to here and here
Besides, if you want to manually renew the lock, please set maxMessageAutoRenewLockDurationInSeconds
as zero.
For example
import {
SendableMessageInfo,
ServiceBusClient,
OnMessage,
OnError,
delay,
ReceiveMode,
ServiceBusMessage,
MessagingError,
} from "@azure/service-bus";
const connectionString =
"";
const queueName = "myqueue";
let receivedMessage: ServiceBusMessage;
let elapsedTime = 0;
const interval = 1000 * 10;
const testDurationInMilliseconds = 1000 * 60;
async function sendMessage(): Promise<void> {
const ns = ServiceBusClient.createFromConnectionString(connectionString);
const client = ns.createQueueClient(queueName);
try {
const sender = client.createSender();
const message: SendableMessageInfo = {
messageId: "test",
body: "test",
label: `test`,
};
console.log("send");
await sender.send(message);
await sender.close();
} finally {
await client.close();
await ns.close();
}
}
async function receiveMessage(): Promise<void> {
const ns = ServiceBusClient.createFromConnectionString(connectionString);
const client = ns.createQueueClient(queueName);
try {
const receiver = client.createReceiver(ReceiveMode.peekLock);
const receiverPromise = new Promise((resolve, _reject) => {
const onMessageHandler: OnMessage = async (brokeredMessage) => {
receivedMessage = brokeredMessage;
console.log("Received message: ", receivedMessage.messageId);
const startTime = Date.now();
while (elapsedTime < testDurationInMilliseconds) {
// simulate the user making an async call that takes time.
await delay(interval);
const data = await receiver.renewMessageLock(receivedMessage);
elapsedTime = Date.now() - startTime;
// log how long we've executed.
console.log(`still executing after ${elapsedTime}`);
console.log(data.toJSON());
console.log("\n");
}
await brokeredMessage.complete();
console.log("Completed message: ", receivedMessage.messageId);
};
const onErrorHandler: OnError = (err) => {
if ((err as MessagingError).retryable === true) {
console.log(
"Receiver will be recreated. A recoverable error occurred:",
err
);
resolve();
} else {
console.log("Error occurred: ", err);
}
};
receiver.registerMessageHandler(onMessageHandler, onErrorHandler, {
autoComplete: false,
maxMessageAutoRenewLockDurationInSeconds: 0,
});
});
await receiverPromise;
await receiver.close();
} finally {
await client.close();
await ns.close();
}
}