Search code examples
pythonasync-awaitazure-eventhub

How to stop an Azure Event Hub Consumer Client in Python


I am running into some trouble with Azure Event Bub with Python. Below is my strater code for connection (Taken from microsoft docs)

import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore


async def on_event(partition_context, event):
    # Print the event data.
    print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))

    # Update the checkpoint so that the program doesn't read the events
    # that it has already read when you run it next time.
    await partition_context.update_checkpoint(event)

async def main():
    # Create an Azure blob checkpoint store to store the checkpoints.
    checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")

    # Create a consumer client for the event hub.
    client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
    async with client:
        # Call the receive method. Read from the beginning of the partition (starting_position: "-1")
        await client.receive(on_event=on_event,  starting_position="-1")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # Run the main method.
    loop.run_until_complete(main()) 

Here, the receiver/consumer keeps listening. If I remove any of the awaits the consumer throws an error. Does anyone know how to stop the consumer after running for some time like timeout).


Solution

  • @Abhishek

    There are 2 options here :

    1. You could stop listening when there is an inactivity for certain period time.
    2. You could stop listening after fixed duration.

    Have detailed both in below steps.

    OPTION 1

    You could use the max_wait_time parameter in order to stop listening in case there is no activity for certain time.

    enter image description here

    I did spin up a simple use case of the above. But you could optimize this further.

    import asyncio
    from azure.eventhub.aio import EventHubConsumerClient
    
    event_hub_connection_str = '<CON_STR>'
    eventhub_name = '<EventHub_NAME>'
    
    
    consumer = EventHubConsumerClient.from_connection_string(
         conn_str=event_hub_connection_str,
           consumer_group='$Default',
           eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
       )
    
    #this event gets called when the message is received or Max_Wait_time is clocked
    async def on_event(partition_context, event):
           print(event) #Optional - to see output
           #Checks whether there is any event returned None. None is returned when this event is called after the Max_Wait_time is crossed
           if(event !=None):
                print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
                #you can update other code like updating blob store
                    
           else:
               print("Timeout is Hit")
               #updating the 
               global receive
               receive = False
    
      
    async def close():
        print("Closing the client.")
        await consumer.close()
        print("Closed")
        
    async def main():
        recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event,max_wait_time=15))
        while(True): # keep receiving for 3 seconds
            await asyncio.sleep(3) 
            if(receive != True):
                print("Cancelling the Task")
                recv_task.cancel()  # stop receiving by cancelling the task
                break;
      
       
    
    receive = True
    asyncio.run(main())
    asyncio.run(close())#closing the Client
    

    With regards to the above code. If there is no activity 15 seconds the async task gets cancelled and the consumer clients gets closed. The program is eventually exited gracefully.

    OPTION 2

    If you are looking for a code in which the you would like to make client to listen for fixed time like 1 hour or some thing. You could refer the below code

    Reference Code

    event_hub_connection_str = '<>'
    eventhub_name = '<>'
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    
    consumer = EventHubConsumerClient.from_connection_string(
           conn_str=event_hub_connection_str,
           consumer_group='$Default',
           eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
       )
    async def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           print("Received event from partition: {}".format(partition_context.partition_id))
    
       # The receive method is a coroutine which will be blocking when awaited.
       # It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.
       
    async def main():
            recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
            await asyncio.sleep(15)  # keep receiving for 3 seconds
            recv_task.cancel()  # stop receiving
    
    async def close():
        print("Closing.....")
        await consumer.close()
        print("Closed") 
    
    asyncio.run(main())
    asyncio.run(close())#closing the Client
    

    The below code that is responsible for the client to be listening for a certain time :

    recv_task =
    asyncio.ensure_future(consumer.receive(on_event=on_event))    
    await asyncio.sleep(3)  # keep receiving for 3 seconds    
    recv_task.cancel()
    

    You could increase the time as per your need.