Search code examples
.net-coreazure-eventhubazure-sdk-.net

How to configure EventProcessorClient to read events only for a particular partition key (not partition id)?


I have an event hub with 2 partitions and used following code to send events to it using different partition keys (based on the documentation at https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/eventhub/Azure.Messaging.EventHubs). I am using Azure.Messaging.EventHubs library for .NET (with .net core 3.1)

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync(new CreateBatchOptions() { PartitionKey = "MyPartitionA" });

    eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("First")));
    eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Second")));
    await produce.SendAsync(eventBatch);

    using EventDataBatch eventBatch2 = await producer.CreateBatchAsync(new CreateBatchOptions() { PartitionKey = "MyPartitionB" });

    eventBatch2.TryAdd(new EventData(Encoding.UTF8.GetBytes("Third")));
    eventBatch2.TryAdd(new EventData(Encoding.UTF8.GetBytes("Fourth")));

    await producer.SendAsync(eventBatch2);
}

As you can see, I sent the 1st batch with 2 events using partition key as MyPartitionA and 2nd batch with 2 events using partition key as MyPartitionB. Interestingly events from both partition keys went into same partition (namely partition 0 on the event hub).

On the receiving side, I am trying to use the code sample at https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/eventhub/Azure.Messaging.EventHubs.Processor#start-and-stop-processing as shown below (I am using Azure.Messaging.EventHubs.Processor library for .NET.)

async Task processEventHandler(ProcessEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an event
        await DoSomethingWithTheEvent(eventArgs.Partition, eventArgs.Data);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an error
        await DoSomethingWithTheError(eventArgs.Exception);
    }
    catch
    {
        // Handle the exception from handler code
    }   
}

private async Task ProcessUntilCanceled(CancellationToken cancellationToken)
{
    var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
    var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

    processor.ProcessEventAsync += processEventHandler;
    processor.ProcessErrorAsync += processErrorHandler;
    
    await processor.StartProcessingAsync();
    
    try
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
        
        await processor.StopProcessingAsync();
    }
    finally
    {
        // To prevent leaks, the handlers should be removed when processing is complete
        processor.ProcessEventAsync -= processEventHandler;
        processor.ProcessErrorAsync -= processErrorHandler;
    }
}

How ever I could not find a way in the above code to receive only the events for a given partition (say MyPartitionA) and not the events from other partition (say MyPartitionB).

  1. Is it possible to register the processor to receive events based on a particular partition key (and not partition id)?
  2. If events with partition key MyPartitionA and MyPartitionB are both sent to partition 0 in the event hub, is it still possible to receive only events for a single partition key (say MyPartitionA) and not other events that do not have the same partition key even though they may reside in the same partition on the event hub?

Solution

  • You cannot read events based on a partition key using any of the clients in the SDK.

    The partition key is a synthetic concept that is is not retained for an event after it was published. When you publish using a partition key, that key is hashed and the resulting value is used to select a partition to route the event to; its intention is to ensure that related events are routed to the same partition, but without the need to understand which partition was selected and not offering any guarantees of fair distribution.

    To accomplish the filtering that you're looking to do, you would want to store your partition key as an application property on the event and then use that value as a filter in your ProcessEventAsync handler. Note that you will receive all events from all partitions - that is the primary goal of the EventProcessorClient.

    I don't believe that we understand enough of the context around your application scenario to help determine the best approach, but based on what we do know, I would suggest considering an alternative. Since you seem to have a need to read a set of events explicitly, publishing to a well-known partition using its Id rather than a key may help. You would then be able to read events exclusively from that partition using the EventHubConsumerClient::ReadEventsFromPartitionAsync method. That would, of course, require that you also took explicit control of where you publish the other events in your application, to ensure that they were routed to your second partition.