Search code examples
scalaakkaakka-persistence

How do I reject a snapshot during recovery?


I'm currently attempting to implement snapshotting to an event sourced application using Akka Persistence. The strategy that I'm attempting to take here is:

  • Save a snapshot every X events
  • Maintain a schemaVersion property on my state object. The purpose of the schemaVersion is to allow me to evolve/migrate my state if I change the way I'm replying my events.
  • During recovery, if I receive a SnapshotOffer whose state.schemaVersion is less than my current schema version, discard that snapshot and replay events from the beginning
  • Delete all snapshots earlier than my latest valid snapshot after recovery complete

However, I'm finding that I'm not able to discard snapshots during recovery. If there is a snapshot in the snapshot store, earlier events will not be offered to me.

I'm having a hard time reading up on how this should be handled. What's the proper approach here?


Solution

  • There is one thing I don't get, what is the state object? And from where it's getting the schemaVersion? Isn't the state exactly the state of your actor that is reconstructed from the snapshot / events?

    In any case, you can't delete and/or skip a snapshot once it's offered. Instead, you can do the following:

    1. add a config drop-snapshot (defaults to false)
    2. on preStart

      override def preStart(): Unit = {
         super.preStart()
         if (dropSnapshot) deleteSnapshots(SnapshotSelectionCriteria.Latest)
      }
      
    3. override recovery method

      override def recovery = {
         if (dropSnapshot) {
           Recovery(fromSnapshot = SnapshotSelectionCriteria.None)
         } else {
           Recovery(fromSnapshot = SnapshotSelectionCriteria.Latest)
         }
      }