This is a part of an IEventProcessor implementation taken from here:
public class SimpleEventProcessor : IEventProcessor
{
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
{
foreach (EventData eventData in events)
{
}
}
}
As new events are added to the EventHub, the ProcessEventsAsync method is invoked and the foreach loop can be used to process the events. I would now like to add Observers to the SimpleEventProcessor using, for example, a ObserverRegistry as discussed here. The proposed ObserverRegistry looks like this:
public class ObserverRegistry : IObserverRegistry<IProjectionWriterFactory>
{
IEnumerable<object> GetObservers(IProjectionWriterFactory factory)
{
yield return new LoanApplicationObserver();
yield return new OfferObserver();
// more observers...
}
}
Unfortunately, there are a few things missing. How do I register several observers with the SimpleEventProcessor so that events are passed from ProcessEventsAsync to all observers and ultimately their When methods?
Full source code is here. Synopsis follows:
You could define a static event on the SimpleEventProcessor:
public class SimpleEventProcessor : IEventProcessor
{
public static event EventHandler<MessageReceivedEventArgs> OnMessageReceived;
public SimpleEventProcessor()
{ }
}
Then raise the OnMessageReceived event in ProcessEventsAsync:
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData message in messages)
{
OnMessageReceived(this, new MessageReceivedEventArgs() { ReceivedOn = DateTimeOffset.UtcNow, Message = message });
}
}
Very Important: Ensure that all the subscribers are removed on processor close. This is really important, because static event with missing unsubscription can cause memory leaks article explaining this.:
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
if (OnMessageReceived != null)
{
foreach (EventHandler<MessageReceivedEventArgs> subscriber in OnMessageReceived.GetInvocationList())
{
OnMessageReceived -= subscriber;
}
}
}
Finally you can hook up observers as part of your initialization logic:
ObserverRegistry registry = new ObserverRegistry();
foreach (IObserver observer in registry.GetObservers())
{
SimpleEventProcessor.OnMessageReceived += new EventHandler<MessageReceivedEventArgs>(
(sender, e) => observer.When(e));
}
Example output from the console application:
SimpleEventProcessor: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18
Observer1: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18
Observer2: bc84f0b9-6e2d-4c4e-9169-c1bd277d3c18
SimpleEventProcessor: a29d5875-7c53-4a7c-8113-ef7c24c2851f
Observer1: a29d5875-7c53-4a7c-8113-ef7c24c2851f
Observer2: a29d5875-7c53-4a7c-8113-ef7c24c2851f
I would like to highlight the following:
IEventProcessorFactory
might be more effective as you have more control over the processor instantiation and disposal.Hope the above answers your question.