Search code examples
python-3.xazureazureservicebusazure-servicebus-queuesazure-servicebus-topics

Azure service bus message auto renewal not working


I have a python service which uses azure service bus topic for sending and receiving messages. I am using azure-servicebus==7.12.2.

Max timeout I can set on azure portal is 5 minute, but the processing for the message on topic take around 60 minutes. So, I am using AutoLockRenewer as suggested online on couple of posts.

Here is my code

async def receive_messages_from_topic(
    servicebus_client: ServiceBusClient,
    topic_name: str,
    executor: Any,  # type: ignore
):
    async with servicebus_client:
        renewer = AutoLockRenewer(max_lock_renewal_duration=3600)
        async with servicebus_client.get_subscription_receiver(
            topic_name=topic_name,
            subscription_name=SUBSCRIPTION_NAME,
            auto_lock_renewer=renewer,  # type: ignore
        ) as receiver:
            logger.info(f"Listening to topic: {topic_name} for subscription: {SUBSCRIPTION_NAME}")
            while True:
                try:
                    messages = await receiver.receive_messages()
                    for message in messages:
                        # Process the message
                        logger.info(
                            f"Received message from topic: {topic_name}, msg: {str(message)}"
                        )

                        with make_temp_directory() as temp_dir:
                            try:
                                bytes_str = str(message)
                                bytes = ast.literal_eval(bytes_str)
                                executor(topic_name, bytes, temp_dir)
                                await receiver.complete_message(message)
                            except Exception as e:
                                logger.exception(
                                    f"Error executing job for topic: {topic_name} message: {message.message_id}, {e}"
                                )
                                await receiver.dead_letter_message(message)

                    if len(messages) == 0:
                        # Small sleep to avoid busy looping
                        await asyncio.sleep(60)
                except Exception as e:
                    logger.exception(f"Error processing message for topic: {topic_name}, {e}")

executor(topic_name, bytes, temp_dir) will complete the processing in 40 minutes, no issues or errors. But, when I try to mark the message as complete in the above loop it will throw exception

Traceback (most recent call last):
  File "/home/vscode/.cache/bazel/_bazel_vscode/d9fd8dd9485b3d1d1994c3846baefa35/execroot/_main/bazel-out/k8-opt-release/bin/tools/service_bus/run/run.runfiles/_main/tools/service_bus/run/run.py", line 72, in receive_messages_from_topic
    await receiver.complete_message(message)
  File "/home/vscode/.local/lib/python3.10/site-packages/azure/servicebus/aio/_servicebus_receiver_async.py", line 852, in complete_message
    await self._settle_message_with_retry(message, MESSAGE_COMPLETE)
  File "/home/vscode/.local/lib/python3.10/site-packages/azure/servicebus/aio/_servicebus_receiver_async.py", line 494, in _settle_message_with_retry
    raise MessageLockLostError(
azure.servicebus.exceptions.MessageLockLostError: The lock on the message lock has expired.

I tried setting AutoLockRenewer on message and session still same result. https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/index.html#automatically-renew-message-or-session-locks

Not sure if I missing something or something is wrong with my implementation


Solution

  • The reason why AutoLockRenewer was not working is because the call to the executor is a blocking call, preventing it from running in the background. I changed the executor to async and it started working.

    Thanks to Microsoft for the answer https://github.com/Azure/azure-sdk-for-python/issues/37245