Search code examples
akkacqrsakka-streamakka-clusterakka-persistence

Akka Persistence Query and actor sharding


I'm doing the query side of a CQRS Akka actors app.

Query actors are setup as a cluster shard and are filled in with events from one Persistence Query stream.

My questions are:

  1. If one of the actors in cluster shard restart how to recover it?

    • Shut down the whole cluster shard and reply all the events?
    • Make the actors in cluster shard persistent actors and save new set of events for query side only?
  2. If the actor which is filler with Persistence Query restarts, how can I cancel current PQ and start it again?


Solution

  • As discussed I would evaluate persisting your query side in a database.

    If that is not an option and you want to stick with your single persistence query per shard do the following in your query actor:

    var inRecovery: Boolean = true;
    
    override def preStart( ) = {
        //Subscribe to your event live stream now, so you don't miss anything during recovery
        // e.g. send Subscription message to your persistence query actor
    
        //Re-Read everything up to now for recovery
        readJournal.currentEventsByPersistenceId("persistenceId")
            .watchTermination()((_, f) => f pipeTo self) // Send Done to self after recovery is finished
            .map(Replay.apply) // Mark your replay messages
            .runWith( Sink.actorRef( self, tag ) ) // Send all replay events to self
    }
    
    override def receive = {
        case Done => // Recovery is finished
            inRecovery = false
            unstashAll() // unstash all normal messages received during recovery
    
        case Replay( payload ) =>
            //handle replayed messages
    
        case events: Event =>
            //handle normal events from your persistence query
            inRecovery match {
                case true => stash() // stash normal messages until recovery is done
                case false => 
                    // recovery is done, start handling normal events
            }
    }
    
    
    case class Replay( payload: AnyRef )
    

    So basically before the actor starts subscribe to the persistence query actor and recover the state with a finite stream of all past events, which terminates after all events have passed through. During recovery stash all incoming events, which are not replayed events. Then after recovery is done, unstash everything and start handling the normal messages.