Search code examples
sagarebusrebus-kafka

How to implement a Saga on Topos?


Topos it .NET Event Processing library, similar to Rebus. Unlike Rebus, it is not so much for messages, as for event processing.

Rebus supports Sagas out of the "box", including in terms of persistence, correlation and concurrency. How to implement a Saga on Topos?

If Topos supports Sagas, is there an example of a Saga implementation somewhere?


Solution

  • Topos does not have any kind of built-in sagas, unfortunately.

    In Fleet Manager (the Rebus management app that comes with Rebus Pro, and the reason I made Topos) I made a saga-like event processor that uses MongoDB or LiteDB for persistence.

    This implementation is completely proprietary though, as it's part of a commercial software product, and it's not quite generic enough to be suited for reuse. I can tell you a little bit about it here anyway, hopefully to give you some inspiration on how you could go about building something like it yourself. 🙂

    The event processor is hosted in a Topos consumer, which dispatches all received events to a bunch of "projections", thus implementing the classic event sourced "left-fold" (current_state + event => new_state).

    Fleet Manager has projections in two flavors: process managers (i.e. projections that cause other events to be emitted by issuing commands) and views. The two types combined would be what you call a "saga" 🙂

    One possible view could be implemented like this (with lots of stuff removed for brevity):

    public class QueueInstanceView : ViewInstance<InstancePerQueue>, IExpire, IHaveAccountId, IHaveQueueName, ICanBeHidden
    {
        public string AccountId { get; set; }
        public string QueueName { get; set; }
    
        public DateTime LastActivity { get; set; }
            
        public bool Hidden { get; set; }
    
        protected override void DispatchEvent(AuditEvent auditEvent)
        {
            if (auditEvent.Body is EntityHidden entityHidden)
            {
                QueueName ??= entityHidden.Id;
                Hidden = !entityHidden.Reverse;
            }
            else
            {
                QueueName ??= auditEvent.GetQueueName();
            }
    
            LastActivity = auditEvent.GetTime();
        }
    }
    

    Note how the view class inherits from the generic ViewInstance<> class, closing it with the InstancePerQueue type. The base class keeps track of the ID of the view instance and some other stuff used to implement idempotency, and then InstancePerQueue defines how events are mapped to view instances.

    It looks like this:

    public class InstancePerQueue : ViewLocator
    {
        public override string[] GetViewIds(AuditEvent auditEvent)
        {
            if (auditEvent.Body is EntityHidden entityHidden)
            {
                if (entityHidden.HasType(EntityTypeNames.Queue))
                {
                    var accountId = auditEvent.GetAccountId();
                    return new[] { $"{accountId}/{entityHidden.Id}" };
                }
    
                return Array.Empty<string>();
            }
    
            var queueName = auditEvent.GetQueueNameOrNull();
            if (queueName == null) return Array.Empty<string>();
    
            var accountId = auditEvent.GetAccountId();
            return new[] { $"{accountId}/{queueName}" };
        }
    }
    
    

    thus correlating events with IDs on the form "/" (where "account" in Fleet Manager terminology is basically just an environment, i.e. the queue names get to IDENTIFY the queues within an account).

    Of course lots of logic is then implemented in the projection implementations, but while it's lengthy, it's also fairly straightforward.

    I hope that this could give you some inspiration on how you might want to approach building "sagas" for Topos. 🙂


    Btw. I cannot take credit for this particular design. I was originally exposed to a design very similar to this back in 2013-2014 by Emil Krog Ingerslev, who came up with it for an event-sourced application we were building at d60.

    I later imitated all of the moving parts to implement persistent projections for Cirqus, which we used for a couple of event-sourced apps.

    And finally I made my current implementation for Fleet Manager, back in 2016 when I needed something similar, only without the aggregate root stuff present in Cirqus, and working on Kafka instead of normal databases.