Search code examples
scalaakkaactor

Send message to actor after restart from Supervisor


I am using BackoffSupervisor strategy to create a child actor that has to process some message. I want to implement a very simple restart strategy, in which in case of exception:

  1. Child propagates failing message to supervisor
  2. Supervisor restarts child and sends the failing message again.

  3. Supervisor gives up after 3 retries

  4. Akka persistence is not an option

So far what I have is this:

Supervisor definition:

val childProps = Props(new SenderActor())
val supervisor = BackoffSupervisor.props(
  Backoff.onFailure(
    childProps,
    childName = cmd.hashCode.toString,
    minBackoff = 1.seconds,
    maxBackoff = 2.seconds,
    randomFactor = 0.2 
  )
    .withSupervisorStrategy(
      OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
        case msg: MessageException => {
          println("caught specific message!")
          SupervisorStrategy.Restart
        }
        case _: Exception => SupervisorStrategy.Restart
        case _              ⇒ SupervisorStrategy.Escalate
      })
)

val sup = context.actorOf(supervisor)


sup ! cmd

Child actor that is supposed to send the e-mail, but fails (throws some Exception) and propagates Exception back to supervisor:

class SenderActor() extends Actor {

  def fakeSendMail():Unit =  {
    Thread.sleep(1000)
    throw new Exception("surprising exception")
  } 

  override def receive: Receive = {
    case cmd: NewMail =>

      println("new mail received routee")
      try {
        fakeSendMail()
      } catch {
        case t => throw MessageException(cmd, t)
      }

  }
}

In the above code I wrap any exception into custom class MessageException that gets propagated to SupervisorStrategy, but how to propagate it further to the new child to force reprocessing? Is this the right approach?

Edit. I attempted to resent the message to the Actor on preRestart hook, but somehow the hook is not being triggered:

class SenderActor() extends Actor {

  def fakeSendMail():Unit =  {
    Thread.sleep(1000)
    //    println("mail sent!")
    throw new Exception("surprising exception")
  }

  override def preStart(): Unit = {
    println("child starting")
  }


  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    reason match {
      case m: MessageException => {
        println("aaaaa")
        message.foreach(self ! _)
      }
      case _ => println("bbbb")
    }
  }

  override def postStop(): Unit = {
    println("child stopping")
  }

  override def receive: Receive = {
    case cmd: NewMail =>

      println("new mail received routee")
      try {
        fakeSendMail()
      } catch {
        case t => throw MessageException(cmd, t)
      }

  }
}

This gives me something similar to following output:

new mail received routee
caught specific message!
child stopping
[ERROR] [01/26/2018 10:15:35.690]
[example-akka.actor.default-dispatcher-2]
[akka://example/user/persistentActor-4-scala/$a/1962829645] Could not
process message sample.persistence.MessageException:
Could not process message <stacktrace>
child starting

But no logs from preRestart hook


Solution

  • The reason that the child's preRestart hook is not invoked is because Backoff.onFailure uses BackoffOnRestartSupervisor underneath the covers, which replaces the default restart behavior with a stop-and-delayed-start behavior that is consistent with the backoff policy. In other words, when using Backoff.onFailure, when a child is restarted, the child's preRestart method is not called because the underlying supervisor actually stops the child, then starts it again later. (Using Backoff.onStop can trigger the child's preRestart hook, but that's tangential to the present discussion.)

    The BackoffSupervisor API doesn't support the automatic resending of a message when the supervisor's child restarts: you have to implement this behavior yourself. An idea for retrying messages is to let the BackoffSupervisor's supervisor handle it. For example:

    val supervisor = BackoffSupervisor.props(
      Backoff.onFailure(
        ...
      ).withReplyWhileStopped(ChildIsStopped)
      ).withSupervisorStrategy(
        OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
          case msg: MessageException =>
            println("caught specific message!")
            self ! Error(msg.cmd) // replace cmd with whatever the property name is
            SupervisorStrategy.Restart
          case ...
        })
    )
    
    val sup = context.actorOf(supervisor)
    
    def receive = {
      case cmd: NewMail =>
        sup ! cmd
      case Error(cmd) =>
        timers.startSingleTimer(cmd.id, Replay(cmd), 10.seconds)
        // We assume that NewMail has an id field. Also, adjust the time as needed.
      case Replay(cmd) =>
        sup ! cmd
      case ChildIsStopped =>
        println("child is stopped")
    }
    

    In the above code, the NewMail message embedded in the MessageException is wrapped in a custom case class (in order to easily distinguish it from a "normal"/new NewMail message) and sent to self. In this context, self is the actor that created the BackoffSupervisor. This enclosing actor then uses a single timer to replay the original message at some point. This point in time should be far enough in the future such that the BackoffSupervisor can potentially exhaust SenderActor's restart attempts, so that the child can have ample opportunity to get in a "good" state before it receives the resent message. Obviously this example involves only one message resend regardless of the number of child restarts.


    Another idea is to create a BackoffSupervisor-SenderActor pair for every NewMail message, and have the SenderActor send the NewMail message to itself in the preStart hook. One concern with this approach is the cleaning up of resources; i.e., shutting down the BackoffSupervisors (which will, in turn, shut down their respective SenderActor children) when the processing is successful or when the child restarts are exhausted. A map of NewMail ids to (ActorRef, Int) tuples (in which the ActorRef is a reference to a BackoffSupervisor actor, and the Int is the number of restart attempts) would be helpful in this case:

    class Overlord extends Actor {
    
      var state = Map[Long, (ActorRef, Int)]() // assuming the mail id is a Long
    
      def receive = {
        case cmd: NewMail =>
          val childProps = Props(new SenderActor(cmd, self))
          val supervisor = BackoffSupervisor.props(
            Backoff.onFailure(
              ...
            ).withSupervisorStrategy(
              OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
                case msg: MessageException =>
                  println("caught specific message!")
                  self ! Error(msg.cmd)
                  SupervisorStrategy.Restart
                case ...
              })
          )
          val sup = context.actorOf(supervisor)
          state += (cmd.id -> (sup, 0))
    
        case ProcessingDone(cmdId) =>
          state.get(cmdId) match {
            case Some((backoffSup, _)) =>
              context.stop(backoffSup)
              state -= cmdId
            case None =>
              println(s"${cmdId} not found")
          }
    
        case Error(cmd) =>
           val cmdId = cmd.id
           state.get(cmdId) match {
             case Some((backoffSup, numRetries)) =>
               if (numRetries == 3) {
                 println(s"${cmdId} has already been retried 3 times. Giving up.")
                 context.stop(backoffSup)
                 state -= cmdId
               } else
                 state += (cmdId -> (backoffSup, numRetries + 1))
             case None =>
               println(s"${cmdId} not found")
           }
    
        case ...
      }
    }
    

    Note that SenderActor in the above example takes a NewMail and an ActorRef as constructor arguments. The latter argument allows the SenderActor to send a custom ProcessingDone message to the enclosing actor:

    class SenderActor(cmd: NewMail, target: ActorRef) extends Actor {
      override def preStart(): Unit = {
        println(s"child starting, sending ${cmd} to self")
        self ! cmd
      }
    
      def fakeSendMail(): Unit = ...
    
      def receive = {
        case cmd: NewMail => ...
      }
    }
    

    Obviously the SenderActor is set up to fail every time with the current implementation of fakeSendMail. I'll leave the additional changes needed in SenderActor to implement the happy path, in which SenderActor sends a ProcessingDone message to target, to you.