Search code examples
asynchronousmessagingcqrsevent-sourcingmediatr

Events Out of Order While Rebuilding Read Side DB


I have an event sourced system that I am now implementing an endpoint for that will rebuild the read side Data Stores from the event store events. However, I am now running into what seems to be concurrency problems with how I am processing the events.

I decided to use my event handler code to process the events during the rebuild. In the system's normal state (not a read side db rebuild), my event handlers listen for events they are subscribed to and update the projections accordingly. However, when processing these events through their event handlers in line, I am seeing inconsistent results in the read side DB final state (if it even gets there, which it sometimes doesn't). I guess this means they are executing out of order.

Should I not be using event handlers in this way? I figured since I am processing events, that reusing the event handlers in this way would be quite appropriate.


I am using MediatR for in service messaging. All event handlers implement INotificationHandler.

Here is a sample of the code:

IEnumerable<IEvent> events = await _eventRepo.GetAllAggregateEvents(aggId);
int eventNumber = 0;
foreach (var e in events)
{
    if (e.Version != eventNumber + 1)
        throw new EventsOutOfOrderException("Events were out of order while rebuilding DB");

    var ev = e as Event;
    // publish different historic events to event handlers which work with read DBs

    switch (e.Type)
    {
        case EventType.WarehouseCreated:
            WarehouseCreated w = new WarehouseCreated(ev);
            await _mediator.Publish(w);
            break;
        case EventType.BoxCreated:
            BoxCreated b = new BoxCreated(ev);
            await _mediator.Publish(b);
            break;
        case EventType.BoxLocationChanged:
            BoxLocationChanged l = new BoxLocationChanged(ev);
            await _mediator.Publish(l);
            break;
    }
    eventNumber++;    
}

I have already tried replacing the await keyword with a call to Wait() instead.

Something like _mediator.Publish(bcc).Wait(). But this doesn't seem like a great idea as there is async code behind this. Also it didn't work..

I have also tried queuing the event versions and having event type cases wait until their version is at the top of the queue before publishing the event.

Something like:

    case EventType.BoxContentsChanged:
        BoxContentsChanged bcc = new BoxContentsChanged(ev);
        while (eventQueue.Peek() != bcc.Version)
            continue;
        await _mediator.Publish(bcc);
        eventQueue.Dequeue();
        break;

This also didn't work.


Anyway - if anyone has any ideas on how to deal with this problem, I would be very appreciative. I would prefer not to duplicate all the async event handler code in a synchronous way.


Solution

  • I guess the best way to do this is synchronously, to ensure consistency. This required me to duplicate some of my event handler logic in a Replay service and create synchronous repository methods. No race conditions now though, which is nice.