Search code examples
c#system.reactiverx.neteventstoredb

Rx: Count of Grouped Events in Moving Window


I have started looking at using Reactive Extensions with EventStore. As a proof of concept, I'd like to see if I can get Rx to consume an event stream and output the count of events grouped by type for a window of one second.

So, say that I am consuming a stream with the name "orders", I'd like to see something like the following appear in the console:

OrderCreated 201
OrderUpdated 111

(a second passes..)

OrderCreated 123
OrderUpdated 132

And so on.

So far, I have been able to get an output of the count of all events per second. But can't seem to be able to group them by event type.

The code I am using is based on a gist by James Nugent:

internal class EventStoreRxSubscription
{
    public Subject<ResolvedEvent> ResolvedEvents { get; }

    public Subject<SubscriptionDropReason>  DroppedReasons { get; }
    public EventStoreSubscription Subscription { get; }

    public EventStoreRxSubscription(EventStoreSubscription subscription, Subject<ResolvedEvent> resolvedEvent, Subject<SubscriptionDropReason> droppedReasons)
    {
        Subscription = subscription;
        ResolvedEvents = resolvedEvent;
        DroppedReasons = droppedReasons;
    }
}

static class EventStoreConnectionExtensions
{
    public static Task<EventStoreRxSubscription> SubscribeTo(this IEventStoreConnection connection, string streamName, bool resolveLinkTos)
    {
        return Task<EventStoreRxSubscription>.Factory.StartNew(() => {

            var resolvedEvents = new Subject<ResolvedEvent>();
            var droppedReasons = new Subject<SubscriptionDropReason>();

            var subscriptionTask = connection.SubscribeToStreamAsync(streamName, resolveLinkTos, 
                                                                    (subscription, @event) => resolvedEvents.OnNext(@event), 
                                                                    (subscription, dropReason, arg3) => droppedReasons.OnNext(dropReason));
            subscriptionTask.Wait();

            return new EventStoreRxSubscription(subscriptionTask.Result, resolvedEvents, droppedReasons);
        });
    }
}

class Program
{
    static void Main(string[] args)
    {
         var connection = EventStoreConnection.Create(new IPEndPoint(IPAddress.Loopback, 1113));
         connection.ConnectAsync();

         var subscriptionTask = connection.SubscribeTo("orders", true);
         subscriptionTask.Wait();

         var events = subscriptionTask.Result.ResolvedEvents;

         var query = events.Timestamp()
                .Buffer(TimeSpan.FromSeconds(1))
                .Select(e => e.Count);

         query.Subscribe(Console.WriteLine);

         Console.ReadLine();
    }
 }

Solution

  • I have done something similar to this before, I used Throttle to group all events within a set frequency, however you could use Buffer to get the count/collection for every period.

    The example below provides an abstract example of how I achieved this, where AggregateType and AggregateFunction would be replaced by your own type and aggregation.

    GroupByUntil allows you to group by a type within a set period.

    subscription = observable
        .GroupByUntil(e => e.Key, e => e.Buffer(TimeSpan.FromSeconds(1)))
        .SelectMany(e => e.Aggregate(new AggregateType(), (a, e) => AggregateFunction(a, e))
        .Subscribe(onNext, onError, onCompleted);
    

    EDIT

    Below is a quick example I've knocked up to show how it can be done

    public class EventType
    {
        public string Type { get; set; }
    }
    
    public class AggregatedType
    {
        public string EventType { get; set; }
        public int Count { get; set; }
    }
    
    class Program
    {
        public delegate void ExampleEventHandler(EventType e);
    
        public static event ExampleEventHandler Event;
    
        static void Main(string[] args)
        {
            var subscription = Observable.FromEvent<ExampleEventHandler, EventType>(e => Event += e, e => Event -= e)
                .GroupByUntil(e => e.Type, e => e.Buffer(TimeSpan.FromSeconds(1)))
                .SelectMany(e => e
                    .Select(ev => new AggregatedType {  EventType = ev.Type })
                    .Aggregate(new AggregatedType(), (a, ev) => new AggregatedType { EventType = ev.EventType, Count = a.Count + 1 }))
                .Subscribe(OnAggregaredEvent, OnException, OnCompleted);
    
            Event(new EventType { Type = "A" });
            Event(new EventType { Type = "A" });
            Event(new EventType { Type = "B" });
            Event(new EventType { Type = "B" });
    
            SpinWait.SpinUntil(()=> false, TimeSpan.FromSeconds(2));
    
            Event(new EventType { Type = "A" });
            Event(new EventType { Type = "A" });
            Event(new EventType { Type = "B" });
            Event(new EventType { Type = "B" });
    
            Console.ReadLine();
        }
    
        static void OnAggregaredEvent(AggregatedType aggregated)
        {
            Console.WriteLine("Type: {0}, Count: {1}", aggregated.EventType, aggregated.Count);
        }
    
        static void OnException(Exception ex)
        {
        }
    
        static void OnCompleted()
        {
        }
    }