I have an event sourced system that I am now implementing an endpoint for that will rebuild the read side Data Stores from the event store events. However, I am now running into what seems to be concurrency problems with how I am processing the events.
I decided to use my event handler code to process the events during the rebuild. In the system's normal state (not a read side db rebuild), my event handlers listen for events they are subscribed to and update the projections accordingly. However, when processing these events through their event handlers in line, I am seeing inconsistent results in the read side DB final state (if it even gets there, which it sometimes doesn't). I guess this means they are executing out of order.
Should I not be using event handlers in this way? I figured since I am processing events, that reusing the event handlers in this way would be quite appropriate.
I am using MediatR for in service messaging. All event handlers implement INotificationHandler
.
Here is a sample of the code:
IEnumerable<IEvent> events = await _eventRepo.GetAllAggregateEvents(aggId);
int eventNumber = 0;
foreach (var e in events)
{
if (e.Version != eventNumber + 1)
throw new EventsOutOfOrderException("Events were out of order while rebuilding DB");
var ev = e as Event;
// publish different historic events to event handlers which work with read DBs
switch (e.Type)
{
case EventType.WarehouseCreated:
WarehouseCreated w = new WarehouseCreated(ev);
await _mediator.Publish(w);
break;
case EventType.BoxCreated:
BoxCreated b = new BoxCreated(ev);
await _mediator.Publish(b);
break;
case EventType.BoxLocationChanged:
BoxLocationChanged l = new BoxLocationChanged(ev);
await _mediator.Publish(l);
break;
}
eventNumber++;
}
I have already tried replacing the await
keyword with a call to Wait()
instead.
Something like _mediator.Publish(bcc).Wait()
.
But this doesn't seem like a great idea as there is async code behind this. Also it didn't work..
I have also tried queuing the event versions and having event type cases wait until their version is at the top of the queue before publishing the event.
Something like:
case EventType.BoxContentsChanged:
BoxContentsChanged bcc = new BoxContentsChanged(ev);
while (eventQueue.Peek() != bcc.Version)
continue;
await _mediator.Publish(bcc);
eventQueue.Dequeue();
break;
This also didn't work.
Anyway - if anyone has any ideas on how to deal with this problem, I would be very appreciative. I would prefer not to duplicate all the async event handler code in a synchronous way.
I guess the best way to do this is synchronously, to ensure consistency. This required me to duplicate some of my event handler logic in a Replay service and create synchronous repository methods. No race conditions now though, which is nice.