Search code examples
scalaakkaakka-streamevent-sourcingakka-persistence

How can an event sourced entity to subscribe to state changes in another entity?


I have an events-sourced entity (C) that needs to change its state in response to state changes in another entity of a different type (P). The logic to whether the state of C should actually change is quite complex and the data to compute that lives in C; moreover, many instances of C should listen to one instance of P, and the set of instances increases over time, so I'd rather they pull out of a stream knowing the ID of P than have P keep track of the IDs of all the Cs and push to them.

I am thinking of doing something such as:

  1. Tag a projection of P's events
  2. Have a Subscribe(P.id) command that gets sent to C
  3. If C is not already subscribing to a P (it can only subscribe to one, and it shouldn't change), fire an event Subscribed(P.id)
  4. In response to the event, use Akka-persistent-query to materialize the stream of events tagged in 1, map them to commands, and run asynchronously with a sync that sends them to my ES entity reference

This seems a bit like an anti pattern to have a stream run in the event handler. I am wondering if there's a better/more supported way to do this without the upstream having to know about the downstream. I decided against Akka pub-sub because it does at-most-once delivery, and I'd like to avoid using Kafka if possible.


Solution

  • You definitely don't want to run the stream in the event handler: the event handler should never side effect.

    Assuming that you would like a C to get events from times when that C was not running (including before that C had ever run), this suggests that a stream should be run for each C. Since the subscription will be to one particular P, I'd seriously consider not tagging, but instead using the eventsByPersistenceId stream to get all the events of a P and ignore the ones that aren't of interest. In the stream, you translate those to commands in C's API, including the offset in P's event stream with the command, and send it to C (for at-least-once delivery, a mapAsync with an ask is useful; C will persist an event recording that it processed the offset: this allows the command to be idempotent, as C can acknowledge the command if the offset is less-than-or-equal-to the high water offset in its state).

    This stream gets kicked off by the command-handler after successfully persisting a Subscribed(P.id) event (in this case starting from offset 0) and then gets kicked off after the persistent actor is rehydrated if the state shows it's subscribed (in this case starting from one plus the high water offset).

    The rationale for not using tagging here arises from an assumption that the number of events C isn't interested in is smaller than the number of events with the tag from Ps that C isn't subscribed to (note that for most of the persistence plugins, the more tags there are, the more overhead there is: a tag which is only used by one particular instance of an entity is often not a good idea). If the tag in question is rarely seen, this assumption might not hold and eventsByTag and filtering by id could be useful.

    This does of course have the downside of running discrete streams for every C: depending on how many Cs are subscribed to a given P, the overhead of this may be substantial, and the streams for subscribers which are caught up will be especially wasteful. In this scenario, responsibility for delivering commands to subscribed Cs for a given P can be moved to an actor. The only real change in that scenario is that where C would run the stream, it instead confirms that it is subscribed to the event stream by asking that actor feeding events from the P. Because this approach is a marked step-up in complexity (especially around managing when Cs join and drop out of the shared "caught-up" stream), I'd tend to recommend starting with the stream-per-C approach and then going to the shared stream (it's also worth noting that there can be multiple shared streams: in fact I'd tend to have shared streams be per-ActorSystem (e.g. a "node singleton" per P of interest) so as not to involve remoting), since it's not difficult to make the transition (from C's perspective, there's not really a difference whether the adapted commands are coming from a stream it started or from a stream being run by some other actor).