I have one producer which is sending the events to Event hub. I want to create 2 receivers to receive events from eventhub. How to implement that.
The code for receiver:
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from azure.eventhub import EventHubSharedKeyCredential, EventData, EventHubConsumerClient
from azure.core.exceptions import ResourceExistsError
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
#Eventhub access credentials
connection_str = ****
consumer_group = '$Default'
eventhub_name = ****
#Blobstorage Storage credentials
storage_connection_str = ****
container_name = ****
storageAccount = ****
#For checkpointing in Blob storage
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
#Initiate BlobServiceClient to access the Blob storage
blob_service_client = BlobServiceClient.from_connection_string(storage_connection_str)
container_client = blob_service_client.get_container_client('container_name') #Dump final data to the Blob storage in append mode.
try:
container_client.create_container() #Create new Container in the service
properties = container_client.get_container_properties()
except ResourceExistsError:
print("Container already exists.")
#Instantiate a new BlobClient
#blob_client = container_client.get_blob_client("data.csv")
def get_messages():
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
def on_event_batch(partition_context, events):
#log.info("Partition {}, Received count: {}".format(partition_context.partition_id, len(events)))
print("Received event from partition {}".format(
partition_context.partition_id)) # Since no partition is defined so partition = 0 by default.
if (len(events) == 0):
client.close() # closing the client if there is no event triggered.
else:
for event in events:
list_ = event.body_as_json()
# Update checkpoint
partition_context.update_checkpoint()
try:
with client:
client.receive_batch(
on_event_batch=on_event_batch,
PARTITION="0",)
#starting_position="-1", ) # "-1" is from the beginning of the partition.
except KeyboardInterrupt:
print('Stopped receiving.')
get_messages()
I have created 2 copies of this code with names consumer1.py and consumer2.py. But both these consumers receive the same events every time.
So for example I send 100 events then I want these two consumers to run in parallel and divide those 100 events among themselves and avoiding duplicates. How to achieve this?
So finally I found the solution to create multiple consumers under the same consumer group which can consume the events parallelly and also should share the load among each other.
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from azure.eventhub import EventHubSharedKeyCredential, EventData, EventHubConsumerClient
from azure.core.exceptions import ResourceExistsError
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
#Eventhub access credentials
connection_str = ****
consumer_group = '$Default'
eventhub_name = ****
#Blobstorage Storage credentials
storage_connection_str = ****
container_name = ****
storageAccount = ****
#For checkpointing in Blob storage
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
#Initiate BlobServiceClient to access the Blob storage
blob_service_client = BlobServiceClient.from_connection_string(storage_connection_str)
container_client = blob_service_client.get_container_client('nsc-container')
#Dump final data to the Blob storage in append mode.
try:
container_client.create_container() #Create new Container in the service
properties = container_client.get_container_properties()
except ResourceExistsError:
print("Container already exists.")
#Instantiate a new BlobClient
#blob_client = container_client.get_blob_client("data.csv")
def get_messages():
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name, checkpoint_store=checkpoint_store,)
def on_event_batch(partition_context, events):
#log.info("Partition {}, Received count: {}".format(partition_context.partition_id, len(events)))
print("Received event from partition {}".format(
partition_context.partition_id)) # Since no partition is defined so partition = 0 by default.
line_count = 0
start_time = time.time()
cnt = 0
if (len(events) == 0):
client.close() # closing the client if there is no event triggered.
else:
for event in events:
list_ = event.body_as_json()
cnt += 1
# Update checkpoint
partition_context.update_checkpoint()
print("Number of events received: ",cnt)
line_count = line_count+ cnt
end_time = time.time()
run_time = end_time - start_time
print("\nTotal Received {} records in {} seconds.".format(line_count, run_time))
try:
with client:
client.receive_batch(
on_event_batch=on_event_batch,) # With specified partition_id, load-balance will be disabled
except KeyboardInterrupt:
print('Stopped receiving.')
get_messages()
Now create as many copies of the code and save them as consumer_1.py and so on. Also, make sure to keep the number of partitions equal to the number of consumers for best efficiency.