Search code examples
c#azureazureservicebusazure-iot-hubazure-eventhub

Improve code to read messages from Event Hub, depending on last message date


The following code connects to an Azure Event Hub, it iterates over all partitions and then reads the messages to be processed and inserted in a database (to be done), the code works fine, however every time IT READS ALL MESSAGES.

This will be installed as an Azure WebJob, so it will run continuously, real time, no stop.

  1. How can I improve this code to only read the unprocessed messages?
  2. Is there a better way to code the while/for section, would you do it differently?

    static void Main(string[] args)
    {
        ServiceBusConnectionStringBuilder builder = new ServiceBusConnectionStringBuilder(ConfigurationManager.AppSettings["ConnectionString"].ToString());
        builder.TransportType = TransportType.Amqp;
        MessagingFactory factory = MessagingFactory.CreateFromConnectionString(ConfigurationManager.AppSettings["ConnectionString"].ToString());
        EventHubClient client = factory.CreateEventHubClient(ConfigurationManager.AppSettings["eventHubEntity"].ToString());
        EventHubConsumerGroup group = client.GetDefaultConsumerGroup();
    
        CancellationTokenSource cts = new CancellationTokenSource();
        System.Console.CancelKeyPress += (s, e) =>
        {
            e.Cancel = true;
            cts.Cancel();
            Console.WriteLine("Exiting...");
        };
        var d2cPartitions = client.GetRuntimeInformation().PartitionIds;
    
        while (true)
        {
            foreach (string partition in d2cPartitions)
            {
                EventHubReceiver receiver = group.CreateReceiver(partition, DateTime.MinValue);
                EventData data = receiver.Receive();
                Console.WriteLine("{0} {1} {2}", data.PartitionKey, data.EnqueuedTimeUtc.ToLocalTime(), Encoding.UTF8.GetString(data.GetBytes()));
                var dateLastMessage = data.EnqueuedTimeUtc.ToLocalTime();
                receiver.Close();
                client.Close();
                factory.Close();
            }
        }
    }
    

Solution

  • Using the EventHubReceiver does not give you the control you need. Instead you should use an EventProcessorHost which allows checkpoints that you can use to resume processing messages.

    See http://blogs.biztalk360.com/understanding-consumer-side-of-azure-event-hubs-checkpoint-initialoffset-eventprocessorhost/ and https://blogs.msdn.microsoft.com/servicebus/2015/01/16/event-processor-host-best-practices-part-1/ for background reading.

    See https://azure.microsoft.com/en-us/documentation/articles/event-hubs-csharp-ephcs-getstarted/#receive-messages-with-eventprocessorhost for a tutorial.

    You can easily host an EventProcessor in a WebJob.