I have this code:
async def run_events_listener():
# Create an Azure blob checkpoint store to store the checkpoints.
checkpoint_store = BlobCheckpointStore.from_connection_string(os.getenv('CHECKPOINT_STORE_URI'), os.getenv('CHECKPOINT_CONTAINER'))
# Create a consumer client for the event hub.
client = EventHubConsumerClient.from_connection_string(
os.getenv('EVENTS_HUB_URI'), consumer_group="$Default",
eventhub_name=os.getenv('EVENTHUB_NAME'), checkpoint_store=checkpoint_store)
async with client:
# Call the receive method. Read from the beginning of the partition (starting_position: "-1")
await client.receive(on_event=on_event, starting_position="-1")
In order to implement graceful stop, I have to close EventHubConsumerClient correctly. run_events_listener
is an asyncio.Task
. How should I close it?
There is no action that callers need to explicitly take to shut down cleanly. The consumer internally manages the startup/shutdown steps via a try/finally block.
Terminating your process should or cancelling the asyncio
task is sufficient to clean things up.