I have an application with Master-Worker architecture. The Worker needs to do a heavy and long time job. And the Master needs to kill the job when we need.
I've tried not to use Future
and the worker cannot receive any message while working. So I tried to use Future
instead. However, when the Worker was stopped, job was still running. How can I release the resources after stopping the actor?
Here is the code.
import akka.actor.{Actor, ActorRef, ActorSystem, Props, Terminated}
import scala.concurrent.Future
import scala.concurrent.duration._
object Main extends App {
object StopTask
case class DoTask(task: String)
override def main(args: Array[String]): Unit = {
val system = ActorSystem("ClusterSystem")
val master = system.actorOf(Props[Master], "master")
master ! "FooTask"
import system.dispatcher
system.scheduler.scheduleOnce(5 second) {
master ! StopTask
}
}
class Master extends Actor {
val worker: ActorRef = context.actorOf(Props[Worker], "worker")
def receive: Receive = {
case task: String => worker ! DoTask(task)
case StopTask => context stop worker
}
}
class Worker extends Actor {
import context.dispatcher
override def postStop(): Unit = {
println("Stopping task...")
}
def receive: Receive = {
case DoTask(task) =>
Future {
// High loading job here
while (true) {
println(s"Doing $task...")
Thread.sleep(1000)
}
}
}
}
}
The output is...
[INFO ] 2018-04-08 21:48:33,947 akka.event.slf4j.Slf4jLogger - Slf4jLogger started
[INFO ] 2018-04-08 21:48:34,244 akka.remote.Remoting - Starting remoting
[INFO ] 2018-04-08 21:48:34,463 akka.remote.Remoting - Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:49770]
[INFO ] 2018-04-08 21:48:34,466 akka.remote.Remoting - Remoting now listens on addresses: [akka.tcp://ClusterSystem@127.0.0.1:49770]
[INFO ] 2018-04-08 21:48:34,521 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Starting up...
[INFO ] 2018-04-08 21:48:34,717 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Registered cluster JMX MBean [akka:type=Cluster,port=49770]
[INFO ] 2018-04-08 21:48:34,718 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Started up successfully
Doing FooTask...
[INFO ] 2018-04-08 21:48:34,777 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Metrics collection has started successfully
[INFO ] 2018-04-08 21:48:35,017 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2560]
Doing FooTask...
Doing FooTask...
Doing FooTask...
Doing FooTask...
Stopping task...
Doing FooTask...
Doing FooTask...
I have found the way to kill Future
. But I don't know how to integrate into this architecture. Hope anyone to help me.
Since my original answer (breaking up a Future's execution by recursively chaining it with itself) didn't really solve the problem, I did a little research and saw this idea from Dr Roland Kuhn: https://groups.google.com/d/msg/akka-user/nkD5BN17kVk/RpUbm07rvpMJ
... you can also spawn a dedicated thread for running that and have an actor manage it; in this case you can call Thread.interrupt or Thread.stop any way you like while keeping the actor responsive.
I played around with this idea of having an actor manage starting and stopping a thread. Since actors are a good way to manage potentially-expensive resources, an actor is indeed a good candidate to manage a thread. Here's my implementation:
/** Does the main work (i.e. training ML model). */
class WorkerThread(task: String, parent: ActorRef) extends Runnable {
override def run(): Unit = try {
while (true) {
println(s"Doing $task...")
Thread sleep 500
}
} catch {
/* Since this thread may be interrupted at any time, we need to
gracefully handle being interrupted. Since we have a handle to the
actor that's managing us, we can send it a message telling it to
finish up. */
case _: InterruptedException => parent ! Worker.Message.FinishUp
}
}
/** Manages starting and stopping the model training thread. */
class Worker extends Actor {
private var thread: Thread = null
override def receive: Receive = {
case Worker.Message.DoTask(task) =>
if (thread == null) {
thread = new Thread(new WorkerThread(task, self))
thread.start()
}
case Worker.Message.StopTask =>
if (thread != null) thread.interrupt()
case Worker.Message.FinishUp => println("Stopped task...")
}
}
object Worker {
sealed trait Message
object Message {
case class DoTask(task: String) extends Message
case object StopTask extends Message
case object FinishUp extends Message
}
}
Now, we can stop the ML model training at any time using its managing actor. It turns out a Future was the wrong level of abstraction to use, but a Thread is the right one.