Search code examples
javaakkaakka-persistence

Synchronous (or Future-returning) call to Akka UntypedPersistentActor?


I would like to inform callers of the successful application of state-changing commands sent to my object, which extends UntypedPersistentActor.

If I weren't persisting the state-changes, I'd return a Future with a closure affecting the change, and let the caller Await that.

Akka's "persist()" call forces me to pass in a closure that will be executed asynchronously though, if I understand correctly. If I return a Future that itself invokes persist(), I'm only telling the caller that I've successfully queued the change for later application... not that it has been applied, or even that its application will succeed.

I suppose that the guaranteed ordering with respect to one caller kinda satisfies the need to know when the change has been applied, but what if the change fails? If the callee is restarted because of the failure, the caller's message will be dropped, the state change will never happen, and the caller will be unaware. It seems like it would be cleaner to return the error to the caller and surface it there.

Is there a good way to achieve this?


Solution

  • Do not call persist from a Future. It must be called in the receive's context (thread), as it relies on the actors internal state for house-keeping.

    The easiest would be to use the ask pattern, from akka.pattern.ask:

    case c: Command =>
      persist(Event(c)) { sender() ! e }
    
    // caller:
    import akka.pattern.ask
    val f: Future[Event] = (persistentActor ? Command()).mapTo[Event]
    

    You could also use a Promise[T] which your actor can fulfil and send a Future[T] of that promise back to the sender:

    case c: Command =>
      val p = Promise[Event]()
    
      persist(Event(c)) { e => 
        p success e 
      }
    
      sender() ! p.future // Future[T]