Search code examples
event-sourcing

Event Sourcing: How to combine divergent states?


Suppose:

The events are A perceived, B perceived or Ping perceived. A possible sequence of events could be A,A,A,B,Ping.

The states are InA, InB, PingMissing.

The rules are

  1. No Ping in all events -> PingMissing.
  2. A -> InA
  3. B -> InB
  4. (Only Ping events -> InA)

I would like to have one recommended action/ state.

I see three possibilities for the transition function f(s,e)->s:

  1. Create a pseudo event likePingMissing perceived. Hence everything is in one function.
  2. Two separate transition functions and combining the result.
  3. One transition function with two states as a tuple and combining the result.

Any thoughts? Best practices?

Implementation of 2. in F# (language doesn't really matter):

type Event =
| A
| B
| Ping

type State1 = 
| InA
| InB

type State2 = 
| PingReceived
| PingMissing

type StateCombined = 
| InA'
| InB'
| PingMissing'

let f1 s e :State1 =
    match s,e with
    | _, A -> InA
    | _, B -> InB
    | _, _ -> s

let f2 s e :State2 =
    match s,e with
    | _, Ping -> PingReceived
    | _, _ -> s

let fCombined events = 
    let finalState1 = events |> Seq.fold f1 InA 
    let finalState2 = events |> Seq.fold f2 PingMissing 
    match finalState1, finalState2 with
    | _, PingMissing -> PingMissing'
    | InA, _ -> InA'
    | InB, _ -> InB'

fCombined [A;A;A;B]
// PingMissing'

fCombined [A;A;A;B;Ping]
// InB'

Solution

  • I would tend to model the unified state as a tuple of the two substates (broadly in this case: "has a ping been received" and "if a ping has been received, was the last perception an A or a B"). A convenience function can then distill that into a recommendation.

    This has the advantage of not reusing the sequence of events, so is a bit more compatible with a view of the events as a stream: at the very least this results in not having to refetch the events from an event store or keep the entire sequence of events in memory.

    For example, in Scala (and explicitly modeling the situation where no A nor B has been perceived yet):

    sealed trait Event
    case object A extends Event
    case object B extends Event
    case object Ping extends Event
    
    sealed trait PingState
    case object PingReceived extends Event  // Don't strictly need...
    case object PingMissing extends Event
    
    sealed trait LastPerceivedState
    case object InA extends Event
    case object InB extends Event
    
    // ... could just as well be (Option[PingMissing], Option[LastPerceivedState])...
    type State = (PingState, Option[LastPerceivedState])
    
    // ... in which case, this is (Some(PingMissing), None)
    val InitialState = PingMissing -> None
    
    def distilledState(state: State): Either[PingMissing, Option[LastPerceivedState]] =
      state match {
        case (PingMissing, _) => Left(PingMissing)
        case (_, lpsOpt) => Right(lpsOpt)
      }
    

    The transition function could then be written directly (taking advantage of the fact that the events can be partitioned into events which affect PingState or LastPerceivedState but never both):

    val transitionF = { (state: State, evt: Event) =>
      val (ps, lpsOpt) = state
    
      evt match {
        case A    => ps           -> Some(InA)
        case B    => ps           -> Some(InB)
        case Ping => PingReceived -> lpsOpt
      }
    }
    

    In the event that there are events which affect both, then decomposing into subhandlers might simplify the code (at the expense of some possibly redundant invocations):

    val pingStateTransition = { (ps: PingState, evt: Event) =>
      if (ps == PingReceived) PingReceived
      else if (evt == Ping) PingReceived
      else ps
    }
    
    val lastPerceivedStateTransition = { (lpsOpt: Option[LastPerceivedState], evt: Event) =>
      evt match {
        case A => Some(InA)
        case B => Some(InB)
        case _ => lpsOpt
      }
    }
    
    val transitionF = { (state: State, evt: Evt) =>
      pingStateTransition(state._1, evt) -> lastPerceivedStateTransition(state._2, evt)
    }