My Parent Actor looks like
case object StartRemoteProcessor
class ConnectorActor extends Actor with ActorLogging {
override def supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 20 seconds) {
case e: OutOfMemoryError =>
log.error("Exception received => " + e.getMessage)
Restart
case e: IllegalArgumentException =>
log.error("Exception received => " + e.getMessage)
Restart
}
def receive = LoggingReceive {
case StartRemoteProcessor =>
val remoteProcessor = context.actorOf(Props[ProcessingActor], "processingActor")
log.info("Starting Remote Processor")
remoteProcessor ! "Start"
case "ProcessingStopped" =>
notifyFailure()
}
def notifyFailure() = {
log.info("notifying failure to server")
}
}
As per docs
The child actor is stopped if the limit is exceeded.
Requirement
maxNrOfRetries
is exhausted, I want to take a custom action notifyFailure
when actor is finally stopped. This will send email On my Child Actor
I have
class ProcessingActor extends Actor with ActorLogging {
override def aroundPostRestart(reason: Throwable): Unit = self.tell("Start", context.parent)
override def preStart(): Unit = ()
override def postStop(): Unit = context.parent ! "ProcessingStopped"
def receive = LoggingReceive {
case "Start" =>
log.info("ProcessingActor path => " + self.path)
startProcessing()
}
def startProcessing() = {
println("executing startProcessing")
throw new IllegalArgumentException("not implemented by choice")
}
}
But in logs, I see that notifyFailure
is called on every Restart
[INFO] [07/24/2015 11:57:50.107] [main] [Remoting] Starting remoting
[INFO] [07/24/2015 11:57:50.265] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://ConnectorSystem@127.0.0.1:2554]
[INFO] [07/24/2015 11:57:50.266] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://ConnectorSystem@127.0.0.1:2554]
ConnectorSystem Started
[INFO] [07/24/2015 11:57:50.277] [ConnectorSystem-akka.actor.default-dispatcher-3] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Starting Remote Processor
[ERROR] [07/24/2015 11:57:50.509] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.511] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
at com.learn.remote.processing.ProcessingActor$$anonfun$receive$1.applyOrElse(ProcessingActor.scala:18)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[INFO] [07/24/2015 11:57:50.526] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server
[ERROR] [07/24/2015 11:57:50.528] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.528] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
at com.learn.remote.processing.ProcessingActor$$anonfun$receive$1.applyOrElse(ProcessingActor.scala:18)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[INFO] [07/24/2015 11:57:50.533] [ConnectorSystem-akka.actor.default-dispatcher-3] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server
[ERROR] [07/24/2015 11:57:50.534] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.534] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
at com.learn.remote.processing.ProcessingActor$$anonfun$receive$1.applyOrElse(ProcessingActor.scala:18)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[INFO] [07/24/2015 11:57:50.538] [ConnectorSystem-akka.actor.default-dispatcher-3] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server
[ERROR] [07/24/2015 11:57:50.540] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] Exception received => not implemented by choice
[ERROR] [07/24/2015 11:57:50.540] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ProcessingSystem@127.0.0.1:2552/remote/akka.tcp/ConnectorSystem@127.0.0.1:2554/user/connectorActor/processingActor] not implemented by choice
java.lang.IllegalArgumentException: not implemented by choice
at com.learn.remote.processing.ProcessingActor.startProcessing(ProcessingActor.scala:23)
at com.learn.remote.processing.ProcessingActor$$anonfun$receive$1.applyOrElse(ProcessingActor.scala:18)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at com.learn.remote.processing.ProcessingActor.aroundReceive(ProcessingActor.scala:7)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[INFO] [07/24/2015 11:57:50.545] [ConnectorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ConnectorSystem@127.0.0.1:2554/user/connectorActor] notifying failure to server
How can I achieve this behavior?
The answer from daydreamer will work, but an alternative approach could be to watch the child actor from the parent, and when you receive the terminated message, execute notifyFailure
var remoteProcessor:ActorRef = _
def receive = LoggingReceive {
case StartRemoteProcessor =>
remoteProcessor = context.actorOf(Props[ProcessingActor], "processingActor")
context.watch(remoteProcessor)
log.info("Starting Remote Processor")
remoteProcessor ! "Start"
case Terminated(remoteProcessor) =>
notifyFailure()
}
This way you don't need to customize the Actor lifecycle methods, which I find can be a rich source of bugs.