Search code examples
pythonazureazure-eventhubazure-blob-storagekeyerror

Azure Eventhubs (Python): checkpointing with blob storage - keyerror issue in EventProcessor when checkpointing is enabled


I'm having an issue with blob storage checkpointing in eventhubs. My application runs fine if I don't have checkpoint_store set when getting the consumer client. Whenever I try to set the checkpoint_store variable and run my code it throws the following exception:

EventProcessor instance 'xxxxxxxxxxx' of eventhub <name of my eventhub> consumer group <name of my consumer group>. An error occurred while load-balancing and claiming ownership. The exception is KeyError('ownerid'). Retrying after xxxx seconds

The only github entry I could find that even mentioned this kind of error is this one, however the issue itself was never resolved and the person with the problem ended up using a different library instead.

The relevant libraries i'm using are azure-eventhub and azure-eventhub-checkpointstoreblob-aio

Here are relevant snippets of the code I'm using (I used this tutorial as a guide):

import asyncio
from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient
from azure.eventhub import EventData
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
async def on_event(partition_context, event):
    await partition_context.update_checkpoint(event)
    #<do stuff with event data>
checkpoint_store = BlobCheckpointStore.from_connection_string(blob_connection_string, container_name)
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=input_eventhub_name, checkpoint_store=checkpoint_store)

async def main():
  async with client:
    await client.receive(
      on_event=on_event,
    )
    print("Terminated.")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

The issue seems to be solely with blob storage checkpointing; if I comment out 'checkpoint_store=checkpoint_store' when creating the consumer client everything runs with no issues.

The connection to the blob storage appears fine, as I did some digging and found that in the blob storage some folders, 'checkpoint' and 'ownership', were created: blob storage snapshot The latter of which contains some files with an 'ownerid' in their metadata: owner files metadata

I.e. the key definitely exists. What I think is happening is that the EventProcessor is trying to fetch the ownership metadata of these blobs, but is somehow failing to do so. If anyone has any idea as to how to fix this I would very much appreciate it!


Solution

  • This looks like a problem retrieving "ownerid" from one of the blobs. Could you do me a favor to test these scenarios?

    1. Remove everything from the blob container and retry.
    2. If the problem still exists, could you check every blob if they all have metadata "ownerid"?
    3. If the problem still exists, could you replace line 144 of file azure.eventhub.extensions.checkpointstoreblobaio._blobstoragecsaio.py in library azure-eventhub-checkpointstoreblob-aio version 1.1.0 with the following and retry?
    "owner_id": blob.metadata.get("ownerid"),