Search code examples
scalaakkaevent-sourcingakka-persistence

In Akka Typed Event Sourcing is it common to use a single db (the same event journal) for multiple typed persistent entities?


Suppose in application office there are two EventSourcedBehavior actors

trait OfficeFridgeCommand
case object OpenFridge extends OfficeFridgeCommand
case object CloseFridge extends OfficeFridgeCommand

trait OfficeFridgeEvent
case object FridgeOpened extends OfficeFridgeEvent
case object FridgeClosed extends OfficeFridgeEvent

trait OfficeCoffeeMachineCommand
case object MakeCoffee extends OfficeCoffeeMachineCommand

trait OfficeCoffeeMachineEvent
case object CoffeeMade extends OfficeCoffeeMachineEvent

val fridgeEntity = 
  EventSourcedBehavior[OfficeFridgeCommand, OfficeFridgeEvent, OfficeFridge]()

val frontDoorEntity = 
  EventSourcedBehavior[OfficeFrontDoorCommand,, OfficeFrontDoorEvent, OfficeFridge]()

val coffeeMachineEntity = 
  EventSourcedBehavior[OfficeCoffeeMachineCommand, OfficeCoffeeMachineEvent, OfficeFridge]()

Suppose that there was some action with the fridge and there were 1000 events registered with the fridge with various Persistence Ids [0-1000].

Such that the journal like:

ordering persistence_id event_ser_manifest
1 200 FridgeOpened
2 200 FridgeClosed
... ... ...
500 500 FridgeOpened
... ... ...
1000 501 FridgeClosed

If there was a GetCoffeeMachineState message coming in with persistence id 500 to frontDoorEntity actor. The frontDoorEntity would attempt to replay the persistence_id = 500 journal events. It would fail because it will not be able to cast OfficeFridgeEvent into the OfficeCoffeeMachineEvent actor (akka typed remember?).

Is this a common setup for this type of system. Or does every entity requires its own db that has an event journal with only "valid" type events that the actor accepts?

I am seeing this exact issue in my system right now. If someone (by accident) were to run 1000 of these queries I would have 1000 entity actors attempting to replay these events forever or until I restarted the pods.

What I end up having is infinite attempts to restart the entity actor with the following stack trace

 at akka.persistence.typed.internal.ReplayingEvents.onJournalResponse(ReplayingEvents.scala:200) 
 at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:98) 
 at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:73)
Caused by: java.lang.ClassCastException

which makes sense because a typed actor is attempting to process a different type events.


Solution

  • You said more than one entity (fridge, front door and coffe machine in your example). Each entity replies to different commands and persist different events.

    When you create EventSourcedBehavior using EventSourcedBehavior.apply()

      /**
       * Create a `Behavior` for a persistent actor.
       *
       * @param persistenceId stable unique identifier for the event sourced behavior
       * @param emptyState the intial state for the entity before any events have been processed
       * @param commandHandler map commands to effects e.g. persisting events, replying to commands
       * @param eventHandler compute the new state given the current state when an event has been persisted
       */
      def apply[Command, Event, State](
          persistenceId: PersistenceId,
          emptyState: State,
          commandHandler: (State, Command) => Effect[Event, State],
          eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] = {
        val loggerClass = LoggerClass.detectLoggerClassFromStack(classOf[EventSourcedBehavior[_, _, _]], logPrefixSkipList)
        EventSourcedBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler, loggerClass)
      }
    

    The first parameter is a PersistenceId. You are the one in charge to make that ID is unique. That object offers a factory method asking you for a hint and the entityId

      /**
       * Constructs a [[PersistenceId]] from the given `entityTypeHint` and `entityId` by
       * concatenating them with `|` separator.
       *
       * Cluster Sharding is often used together with `EventSourcedBehavior` for the entities.
       * The `PersistenceId` of the `EventSourcedBehavior` can typically be constructed with:
       * {{{
       * PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId)
       * }}}
       *
       * That format of the `PersistenceId` is not mandatory and only provided as a convenience of
       * a "standardized" format.
       *
       * Another separator can be defined by using the `apply` that takes a `separator` parameter.
       *
       * The `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used
       * in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity`
       * you should use `""` as the separator.
       *
       * @throws IllegalArgumentException if the `entityTypeHint` or `entityId` contains `|`
       */
      def apply(entityTypeHint: String, entityId: String): PersistenceId
    

    As it is detailed in Event Sourcing - PersistenceId

    Persistence Id

    The PersistenceId is the stable unique identifier for the persistent actor in the backend event journal and snapshot store.

    Cluster Sharding is typically used together with EventSourcedBehavior to ensure that there is only one active entity for each PersistenceId (entityId). There are techniques to ensure this uniqueness, an example of which can be found in the Persistence example in the Cluster Sharding documentation. This illustrates how to construct the PersistenceId from the entityTypeKey and entityId provided by the EntityContext.

    The entityId in Cluster Sharding is the business domain identifier of the entity. The entityId might not be unique enough to be used as the PersistenceId by itself. For example two different types of entities may have the same entityId. To create a unique PersistenceId the entityId should be prefixed with a stable name of the entity type, which typically is the same as the EntityTypeKey.name that is used in Cluster Sharding. There are PersistenceId.apply factory methods to help with constructing such PersistenceId from an entityTypeHint and entityId.

    You can take akka persistence shopping cart sample - ShoppingCart Behavior as a good example

      def apply(cartId: String): Behavior[Command] = {
        EventSourcedBehavior[Command, Event, State](
          PersistenceId("ShoppingCart", cartId),
          State.empty,
          (state, command) => ...,
          (state, event) => ...
        )
      }
    

    Your code should be something like

    val fridgeEntity = 
      EventSourcedBehavior[OfficeFridgeCommand, OfficeFridgeEvent, OfficeFridge](
      PersistenceId("Fridge"), UUID.randomUUID(),
      Fridge.emptyState,
      Fridge.commandHandler,
      Fridge.eventHandler
    )
    
    val frontDoorEntity = 
      EventSourcedBehavior[OfficeFrontDoorCommand,, OfficeFrontDoorEvent, OfficeFridge](
      PersistenceId("FrontDoor"), UUID.randomUUID(),
      FrontDoor.emptyState,
      FrontDoor.commandHandler,
      FrontDoor.eventHandler
    )
    
    val coffeeMachineEntity = 
      EventSourcedBehavior[OfficeCoffeeMachineCommand, OfficeCoffeeMachineEvent, OfficeFridge](
      PersistenceId("CoffeeMachine"), UUID.randomUUID(),
      CoffeeMachine.emptyState,
      CoffeeMachine.commandHandler,
      CoffeeMachine.eventHandler
    )
    

    Once you persist an event for the entity in the journal, you would be able to see something like the following in the database

    ordering persistence_id event_ser_manifest
    1 Fridge|<uuid> FridgeOpened
    2 Fridge|<uuid> FridgeClosed
    ... ... ...
    1 CoffeMachine|<uuid> CoffeeMade
    2 CoffeMachine|<uuid> CoffeeMade
    ... ... ...
    1 FrontDoor|<uuid> FrontDoorOpended
    2 FrontDoor|<uuid> FrontDoorClosed

    Do I need to put everything in the same DB? Should I use a relational or non-relational DB?

    As always, it depends. If you need persist millions of events per second and those events comes from millones of different devices, the answer will be NO. Akka persistence offers different persistence plugins

    Each of them has its own pros and cons.

    There are more plugins that you can find at scala index - akka persistence but they could be outdated, not having commercial support and else.