Search code examples
node.jslockingazureservicebusservicebus

NodeJS @azure/service-bus: How to renewLock for a message


As I may handle message for long time which exceeded maximum locking time. I would like to know the syntax in NodeJS to renew message lock.

let { ServiceBusClient, ReceiveMode } = require('@azure/service-bus'),
serviceBusClient = ServiceBusClient.createFromConnectionString(SERVICE_BUS_SECRET),
queueClient = serviceBusClient.createQueueClient(XXX_QUEUE),
receiver = queueClient.createReceiver(ReceiveMode.peekLock),
errorHandler = function() {
    ...
},
messageHandler = function(message) {
    // want to renew message lock here
}

receiver.registerMessageHandler(messageHandler, errorHandler);

Solution

  • When we use the method registerMessageHandler, we can provide the option maxMessageAutoRenewLockDurationInSeconds. It means that the maximum duration in seconds until which the lock on the message will be renewed by the SDK automatically. But please note that this auto-renewal stops once the message is settled or once the user-provided onMessage handler completes its execution. For more details, please refer to here and here

    Besides, if you want to manually renew the lock, please set maxMessageAutoRenewLockDurationInSeconds as zero.

    For example

    import {
      SendableMessageInfo,
      ServiceBusClient,
      OnMessage,
      OnError,
      delay,
      ReceiveMode,
      ServiceBusMessage,
      MessagingError,
    } from "@azure/service-bus";
    const connectionString =
      "";
    const queueName = "myqueue";
    let receivedMessage: ServiceBusMessage;
    let elapsedTime = 0;
    const interval = 1000 * 10;
    const testDurationInMilliseconds = 1000 * 60;
    async function sendMessage(): Promise<void> {
      const ns = ServiceBusClient.createFromConnectionString(connectionString);
      const client = ns.createQueueClient(queueName);
      try {
        const sender = client.createSender();
    
        const message: SendableMessageInfo = {
          messageId: "test",
          body: "test",
          label: `test`,
        };
        console.log("send");
        await sender.send(message);
        await sender.close();
      } finally {
        await client.close();
        await ns.close();
      }
    }
    async function receiveMessage(): Promise<void> {
      const ns = ServiceBusClient.createFromConnectionString(connectionString);
    
      const client = ns.createQueueClient(queueName);
    
      try {
        const receiver = client.createReceiver(ReceiveMode.peekLock);
        const receiverPromise = new Promise((resolve, _reject) => {
          const onMessageHandler: OnMessage = async (brokeredMessage) => {
            receivedMessage = brokeredMessage;
            console.log("Received message: ", receivedMessage.messageId);
            const startTime = Date.now();
            while (elapsedTime < testDurationInMilliseconds) {
              // simulate the user making an async call that takes time.
              await delay(interval);
              const data = await receiver.renewMessageLock(receivedMessage);
              elapsedTime = Date.now() - startTime;
    
              // log how long we've executed.
              console.log(`still executing after ${elapsedTime}`);
              console.log(data.toJSON());
              console.log("\n");
            }
    
            await brokeredMessage.complete();
            console.log("Completed message: ", receivedMessage.messageId);
          };
    
          const onErrorHandler: OnError = (err) => {
            if ((err as MessagingError).retryable === true) {
              console.log(
                "Receiver will be recreated. A recoverable error occurred:",
                err
              );
              resolve();
            } else {
              console.log("Error occurred: ", err);
            }
          };
    
          receiver.registerMessageHandler(onMessageHandler, onErrorHandler, {
            autoComplete: false,
            maxMessageAutoRenewLockDurationInSeconds: 0,
          });
        });
        await receiverPromise;
        await receiver.close();
      } finally {
        await client.close();
        await ns.close();
      }
    }