Search code examples
scalaakkaactorlifecycleakka-supervision

postRestart and preRestart methods are not getting invoke in akka actots


I am following this tutorial here is my code

case class ArtGroupDeleteFromES (uuidList:List[String]) 
class ArtGroupDeleteESActor extends Actor{
val log = LoggerFactory.getLogger(this.getClass)
    override def preStart() {
      log.debug("preStart  Starting ArtGroupDeleteESActor instance hashcode # {}",
       this.hashCode())
      }

    override def postStop() {
      log.debug("postStop Stopping ArtGroupDeleteESActor instance hashcode # {}",
       this.hashCode())
      }

    override def preRestart(reason: Throwable, message: Option[Any]) {
      log.debug("I am restarting")
      log.debug("ArtGroupDeleteESActor: preRestart")
      log.debug(s" MESSAGE: ${message.getOrElse("")}")
      log.debug(s" REASON: ${reason.getMessage}")
      super.preRestart(reason, message)
      }

    override def postRestart(reason: Throwable) {
      log.debug("restart completed!")
      log.debug("ArtGroupDeleteESActor: postRestart")
      log.debug(s" REASON: ${reason.getMessage}")
      super.postRestart(reason)
      }
def receive = {
  case ArtGroupDeleteFromES(uuidList) =>
 throw new Exception("Booom")
  sender ! true
   }
  case  message => 
    log.warn("Received unknown message: {}", message)
    unhandled(message)
 }

}

and here is the how i am sending this actor a message

class ArtGroupDeletionActor extends Actor{

  val log = LoggerFactory.getLogger(this.getClass)
 override val supervisorStrategy = OneForOneStrategy(
                                    maxNrOfRetries = 10, withinTimeRange = 10 seconds) {
    case _:Exception => Restart
  }
 val artGroupDeleteESActor=context.actorOf(Props[ArtGroupDeleteESActor]
      .withDispatcher("akka.actor.ArtGroupDeleteESActor-dispatcher")
      ,name = "ArtGroupDeleteESActor")

   def receive = {

    case DeleteArtGroup(uuidList) =>
      val future1  = ask(artGroupDeleteESActor, ArtGroupDeleteFromES(uuidList)).mapTo[Boolean]
      var isDeletedfromES = Await.result(future1, timeout.duration)
    case message =>
      log.warn("Unhandled message received : {}", message)
      unhandled(message)
  }
}

object test extends App{
 val artGroupDeletionActor=system.actorOf(Props[ArtGroupDeletionActor]
      .withDispatcher("akka.actor.ArtGroupDeletionActor-dispatcher")
      ,name = "ArtGroupDeletionActor")
     artGroupDeletionActor ! DeleteArtGroup(List("123"))
}

the PostRestart() and preRestart() methods are not invoking,but preStart() and postStop() gets called, please guide me where i am doing wrong


Solution

  • (for simplicity I'll call your actors Parent and Child from now on)

    What happens here is that when an exception occurs inside Child.receive, it doesn't send a response to Parent, instead, the actor system sends some control instruction for the supervision strategy. However, Parent is blocked on Await waiting for completion of future1, which only happens after the timeout exceeds, and then, in turn, a TimeoutException is thrown inside Parent.receive, killing (restarting) the Parent actor itself, and thus the supervising message of an exception in Child is then passed to deadLetters, never restarting the Child.

    You should never, ever, ever block inside an actor, so this is incorrect:

      val future1 = ask(artGroupDeleteESActor, ArtGroupDeleteFromES(uuidList)).mapTo[Boolean]
      var isDeletedfromES = Await.result(future1, timeout.duration)
    

    Instead, you have to either utilize some kind of message identification to distinguish one reply from another in concurrent environment, or add an onComplete to the Future and send a message to self in the closure (beware: no logic other than sending a message should be executed inside the closure to the Future!).

    So, option A:

    case class ArtGroupDeleteFromES(id: Long, uuidList: List[String])
    case class ArtGroupDeleteFromESResult(id: Long, success: Boolean)
    
    class Parent extends Actor {
      override val supervisionStrategy = ...
      var msgId = 0L
      var pendingRequesters = Map.empty[Long, ActorRef]
      val child = context.actorOf(Props[Child])
    
      def nextId = {
        msgId += 1
        msgId
      }
    
      def receive = {
        case DeleteArtGroup(uuidList) =>
          val id = nextId
          pendingRequesters += id -> sender() // store a reference to the sender so that you can send it a message when everything completes
          child ! DeleteArtGroupFromES(nextId, uuidList)
        case ArtGroupDeleteFromESResult(id, success) =>
          // process result...
          pendingRequesters(id) ! "done"
          pendingRequesters -= id
      }
    }
    

    And option B:

    case class ArtGroupDeleteFromES(uuidList: List[String])
    case class ArtGroupDeleteFromESResult(replyTo: ActorRef, success: Boolean)
    
    class Parent extends Actor {
      override val supervisionStrategy = ...
      val child = context.actorOf(Props[Child])
    
      def receive = {
        case DeleteArtGroup(uuidList) =>
          val requester = sender() // when the future completes, sender may have already changed, so you need to remember it
          (child ? DeleteArtGroupFromES(uuidList)).onComplete {
            case Success(success) => self ! ArtGroupDeleteFromESResult(requester, success)
            case Failure(e) =>
              log.warn("Could not delete...", e) 
              self ! ArtGroupDeleteFromESResult(requester, success = false)
      }
    }