Search code examples
azurereactive-programmingreactiveazure-eventhub

Processing data from Event Hubs with Reactive X streams


I'd like to use Rx .net extensions to process events from Azure Event Hubs.

How do I create the observable stream from the messages I get from EventProcessorHost?

I've found no reference for this scenario, am I missing something fundamental here? Am I trying to do something that doesn't makes sense?


Solution

  • Whether it makes sense is up to you. Why do you need/want to use Reactive Extensions? Most scenario's involve using Azure Stream Analytics to transform and query the data in near realtime.

    But using an EventProcessor to process the data can be done, a very rough sketch to get you started:

    public class EventProcessor : IEventProcessor
    {
        private readonly EventStreamProcessor eventStreamProcessor;
    
        public EventProcessor(EventStreamProcessor eventStreamProcessor)
        {
            this.eventStreamProcessor = eventStreamProcessor;
        }
    
        public Task OpenAsync(PartitionContext context)
        {
            return Task.CompletedTask;
        }
    
        public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> eventDatas)
        {
            foreach(var eventData in eventDatas)
                eventStreamProcessor.Post(eventData);
    
            return Task.CompletedTask;
        }
    
        public Task CloseAsync(PartitionContext context, CloseReason reason)
        {
            return Task.CompletedTask;
        }
    }
    
    public sealed class EventStreamProcessor : IDisposable
    {
        private Subject<EventData> dataStream = new Subject<EventData>();
        private readonly IDisposable subscription;
    
        public EventStreamProcessor()
        {
            subscription = dataStream
                .Synchronize()
                .AsObservable()
                .Subscribe((evenData) => {
                    // Do something
                })
        }
    
        public void Dispose()
        {
            dataStream.OnCompleted();
            subscription.Dispose();
        }
    
        public void Post(EventData eventData)
        {
            dataStream.OnNext(eventData);
        }
    }
    

    A couple of things to consider:

    • Reliable checkpointing is going to be difficult. You call await context.CheckpointAsync(); at any moment after pushing data to the RX stream but you do not know it the RX pipeline has processed the data yet.
    • Multiple EventProcessor's can be alive at any given moment so make sure you push the data to a single RX stream. Use a singleton or inject the same RX processor into each EventProcessor using an implementation of the IEventProcessorFactory interface

    I think the answer to this Q & A is also relevent for you.