Search code examples
c#domain-driven-designcqrsevent-sourcingeventstoredb

In EventSourcing, what's the accepted wisdom around subscriptions?


As the title suggests. Currently I am using the GetEventStore backend for my events and that's working brilliantly. My confusion is with the subscriptions for keeping my Read store up to date.

Currently I'm creating a catch up subscription using the

EventStoreConnection.SubscribeToAllFrom(Position.Start, _subscriptionSettings, OnEventMaterialized, OnLiveProcessingStarted, OnSubscriptionDropped);

method. This works well and I have a way of caching all my particular event observers and they build my read model well.

However, what if I wanted to replay a particular aggregate's events? As it stands I need to start the subscription from the start again and let it all work through. The way I've done this to date is to store each observers current Stream Position, compare it with the subscription, and if greater, skip.

Is it better to have each observer have it's own subscription, that way only the one can be cleared? How do you know the Stream IDs in that case, do you need to persist every single stream id you create in order to re-subscribe at some later date?

Some code examples, or some reading, would be fantastic. I feel I'm missing the point with this final piece of the ES puzzle


Solution

  • To answer your general question: yes, each observer should maintain their own position in the queue.

    How do you know the Stream IDs in that case, do you need to persist every single stream id you create in order to re-subscribe at some later date?

    Your stream names should be reproducible. The format I use is "events-{AggregateType}-{AggregateId}". I enable projections for GES and so the stream "$events-{AggregateType}" also exists. I can use this as the stream for one of my observers if needed.

    Storing observer's configurations in a DB makes it easy to add a new one easily:

    Name: All-Projections
    StreamName: $events
    Position: 1200 <-- This is updated by the observer as it reads
    BufferSize: 100 <-- Number of records to read at a time
    
    Name: Customers-Rebuild
    StreamName: $events-CustomerAggregate
    Position: 0 <-- This consumer has not run yet
    BufferSize: 100
    

    Then you can run your base consumer executable and pass the observer name as a parameter.