Search code examples
python-3.xazure-eventhub

EventHub and Receive


All,

I modified the sample Receive python script for Azure EventHub a bit but when I run it goes into a loop fetching the same events over and over. I'm not sending any events to the eventhub since I want to read what is there and I dont see a while loop here so how is this happening and how do I stop after it reads all the events currently in the EventHub?

Thanks
grajee

# https://learn.microsoft.com/en-us/python/api/overview/azure/eventhub-readme?view=azure-python#consume-events-from-an-event-hub
import logging
from azure.eventhub import EventHubConsumerClient

connection_str = 'Endpoint=sb://testhubns01.servicebus.windows.net/;SharedAccessKeyName=getevents;SharedAccessKey=testtestest='
consumer_group = '$Default'
eventhub_name = 'testpart'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event(partition_context, event):
    logger.info("Received event from partition: \"{}\"   :  \"{}\"" .format(partition_context.partition_id,event.body_as_str()))
    partition_context.update_checkpoint(event)

with client:
    client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive(on_event=on_event, partition_id='0')

client.close()

Solution

  • The below piece of code from here makes it more clear .

    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
    
    connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
    consumer_group = '<< CONSUMER GROUP >>'
    eventhub_name = '<< NAME OF THE EVENT HUB >>'
    storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
    container_name = '<<NAME OF THE BLOB CONTAINER>>'
    
    async def on_event(partition_context, event):
        # do something
        await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.
    
    async def receive(client):
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
    
    async def main():
        checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
        client = EventHubConsumerClient.from_connection_string(
            connection_str,
            consumer_group,
            eventhub_name=eventhub_name,
            **checkpoint_store=checkpoint_store,  # For load balancing and checkpoint. Leave None for no load balancing**
        )
        async with client:
            await receive(client)
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())