Search code examples
c#observer-patternazureservicebusazure-eventhubreactivex

registering observers with IEventProcessor


This is a part of an IEventProcessor implementation taken from here:

public class SimpleEventProcessor : IEventProcessor
{
    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
    {
        foreach (EventData eventData in events)
        {

        }
    }
}

As new events are added to the EventHub, the ProcessEventsAsync method is invoked and the foreach loop can be used to process the events. I would now like to add Observers to the SimpleEventProcessor using, for example, a ObserverRegistry as discussed here. The proposed ObserverRegistry looks like this:

public class ObserverRegistry : IObserverRegistry<IProjectionWriterFactory>
{
    IEnumerable<object> GetObservers(IProjectionWriterFactory factory)
    {
        yield return new LoanApplicationObserver();
        yield return new OfferObserver();
        // more observers...
    }
}

Unfortunately, there are a few things missing. How do I register several observers with the SimpleEventProcessor so that events are passed from ProcessEventsAsync to all observers and ultimately their When methods?


Solution

  • Full source code is here. Synopsis follows:

    You could define a static event on the SimpleEventProcessor:

    public class SimpleEventProcessor : IEventProcessor 
    {
        public static event EventHandler<MessageReceivedEventArgs> OnMessageReceived;        
    
        public SimpleEventProcessor()
        { }
    }
    

    Then raise the OnMessageReceived event in ProcessEventsAsync:

    public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach (EventData message in messages)
        {
            OnMessageReceived(this, new MessageReceivedEventArgs() { ReceivedOn = DateTimeOffset.UtcNow, Message = message });
        }
    }
    

    Very Important: Ensure that all the subscribers are removed on processor close. This is really important, because static event with missing unsubscription can cause memory leaks article explaining this.:

    public async Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        if (OnMessageReceived != null)
        {
            foreach (EventHandler<MessageReceivedEventArgs> subscriber in OnMessageReceived.GetInvocationList())
            {
                OnMessageReceived -= subscriber;
            }
        }
    }
    

    Finally you can hook up observers as part of your initialization logic:

    ObserverRegistry registry = new ObserverRegistry();
    foreach (IObserver observer in registry.GetObservers())
    {
        SimpleEventProcessor.OnMessageReceived += new EventHandler<MessageReceivedEventArgs>(
        (sender, e) => observer.When(e));
    }
    

    Example output from the console application:

    SimpleEventProcessor: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18
    Observer1: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18
    Observer2: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18
    SimpleEventProcessor: a29d5875-7c53-4a7c-8113-ef7c24c2851f
    Observer1: a29d5875-7c53-4a7c-8113-ef7c24c2851f
    Observer2: a29d5875-7c53-4a7c-8113-ef7c24c2851f
    

    I would like to highlight the following:

    1. In your use case registering a IEventProcessorFactory might be more effective as you have more control over the processor instantiation and disposal.
    2. It is recommended to keep the ProcessEventsAsync method as light and fast as possible. May be creating separate consumer groups is a better option in your case?

    Hope the above answers your question.