Search code examples
azureazure-eventhub

How to create multiple receivers in Azure Eventhub to avoide duplicates?


I have one producer which is sending the events to Event hub. I want to create 2 receivers to receive events from eventhub. How to implement that.

The code for receiver:

from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from azure.eventhub import EventHubSharedKeyCredential, EventData, EventHubConsumerClient
from azure.core.exceptions import ResourceExistsError
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore

#Eventhub access credentials
connection_str = ****
consumer_group = '$Default'
eventhub_name = ****

#Blobstorage Storage credentials
storage_connection_str = ****
container_name = ****
storageAccount = ****

#For checkpointing in Blob storage
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)

#Initiate BlobServiceClient to access the Blob storage
blob_service_client = BlobServiceClient.from_connection_string(storage_connection_str)
container_client = blob_service_client.get_container_client('container_name') #Dump final data to the Blob storage in append mode.

try:
  container_client.create_container()  #Create new Container in the service
  properties = container_client.get_container_properties()
except ResourceExistsError:
  print("Container already exists.")

#Instantiate a new BlobClient
#blob_client = container_client.get_blob_client("data.csv")


def get_messages():
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

def on_event_batch(partition_context, events):
    #log.info("Partition {}, Received count: {}".format(partition_context.partition_id, len(events)))
    print("Received event from partition {}".format(
        partition_context.partition_id))  # Since no partition is defined so partition = 0 by default.
    if (len(events) == 0):
        client.close()  # closing the client if there is no event triggered.

    else:
        for event in events:
            list_ = event.body_as_json()
            # Update checkpoint
            partition_context.update_checkpoint()
try:
    with client:
        client.receive_batch(
            on_event_batch=on_event_batch,
            PARTITION="0",)
            #starting_position="-1", )  # "-1" is from the beginning of the partition.

except KeyboardInterrupt:
    print('Stopped receiving.')

get_messages()

I have created 2 copies of this code with names consumer1.py and consumer2.py. But both these consumers receive the same events every time.

So for example I send 100 events then I want these two consumers to run in parallel and divide those 100 events among themselves and avoiding duplicates. How to achieve this?


Solution

  • So finally I found the solution to create multiple consumers under the same consumer group which can consume the events parallelly and also should share the load among each other.

    from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
    from azure.eventhub import EventHubSharedKeyCredential, EventData, EventHubConsumerClient
    from azure.core.exceptions import ResourceExistsError
    from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
    
    #Eventhub access credentials
    connection_str = ****
    consumer_group = '$Default'
    eventhub_name = ****
    
    #Blobstorage Storage credentials
    storage_connection_str = ****
    container_name = ****
    storageAccount = ****
    
    
    #For checkpointing in Blob storage
    checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
    
    #Initiate BlobServiceClient to access the Blob storage
    blob_service_client = BlobServiceClient.from_connection_string(storage_connection_str)
    container_client = blob_service_client.get_container_client('nsc-container') 
    #Dump final data to the Blob storage in append mode.
    
    try:
      container_client.create_container()  #Create new Container in the service
      properties = container_client.get_container_properties()
    except ResourceExistsError:
      print("Container already exists.")
    
    #Instantiate a new BlobClient
    #blob_client = container_client.get_blob_client("data.csv")
    
    def get_messages():
        client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name, checkpoint_store=checkpoint_store,)
    
        def on_event_batch(partition_context, events):
            #log.info("Partition {}, Received count: {}".format(partition_context.partition_id, len(events)))
            print("Received event from partition {}".format(
            partition_context.partition_id))  # Since no partition is defined so partition = 0 by default.
        line_count = 0
        start_time = time.time()
        cnt = 0
        if (len(events) == 0):
            client.close()  # closing the client if there is no event triggered.
    
        else:
    
            for event in events:
                list_ = event.body_as_json()
                cnt += 1
                # Update checkpoint
                partition_context.update_checkpoint()
            print("Number of events received: ",cnt)
        line_count = line_count+ cnt
        end_time = time.time()
        run_time = end_time - start_time
        print("\nTotal Received {} records in {} seconds.".format(line_count, run_time))
    
    try:
    
        with client:
            client.receive_batch(
                on_event_batch=on_event_batch,) # With specified partition_id, load-balance will be disabled
    
    except KeyboardInterrupt:
        print('Stopped receiving.')
    
    get_messages()
    

    Now create as many copies of the code and save them as consumer_1.py and so on. Also, make sure to keep the number of partitions equal to the number of consumers for best efficiency.