Search code examples
scalaakkafutureactor

Cancel a Future after stopping an actor


I have an application with Master-Worker architecture. The Worker needs to do a heavy and long time job. And the Master needs to kill the job when we need.

I've tried not to use Future and the worker cannot receive any message while working. So I tried to use Future instead. However, when the Worker was stopped, job was still running. How can I release the resources after stopping the actor?

Here is the code.

import akka.actor.{Actor, ActorRef, ActorSystem, Props, Terminated}

import scala.concurrent.Future
import scala.concurrent.duration._


object Main extends App {

  object StopTask

  case class DoTask(task: String)

  override def main(args: Array[String]): Unit = {
    val system = ActorSystem("ClusterSystem")
    val master = system.actorOf(Props[Master], "master")
    master ! "FooTask"
    import system.dispatcher
    system.scheduler.scheduleOnce(5 second) {
      master ! StopTask
    }
  }

  class Master extends Actor {
    val worker: ActorRef = context.actorOf(Props[Worker], "worker")

    def receive: Receive = {
      case task: String => worker ! DoTask(task)
      case StopTask => context stop worker
    }
  }

  class Worker extends Actor {

    import context.dispatcher

    override def postStop(): Unit = {
      println("Stopping task...")
    }

    def receive: Receive = {
      case DoTask(task) =>
        Future {
          // High loading job here
          while (true) {
            println(s"Doing $task...")
            Thread.sleep(1000)
          }
        }
    }
  }

}

The output is...

[INFO ] 2018-04-08 21:48:33,947 akka.event.slf4j.Slf4jLogger - Slf4jLogger started
[INFO ] 2018-04-08 21:48:34,244 akka.remote.Remoting - Starting remoting
[INFO ] 2018-04-08 21:48:34,463 akka.remote.Remoting - Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:49770]
[INFO ] 2018-04-08 21:48:34,466 akka.remote.Remoting - Remoting now listens on addresses: [akka.tcp://ClusterSystem@127.0.0.1:49770]
[INFO ] 2018-04-08 21:48:34,521 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Starting up...
[INFO ] 2018-04-08 21:48:34,717 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Registered cluster JMX MBean [akka:type=Cluster,port=49770]
[INFO ] 2018-04-08 21:48:34,718 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Started up successfully
Doing FooTask...
[INFO ] 2018-04-08 21:48:34,777 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Metrics collection has started successfully
[INFO ] 2018-04-08 21:48:35,017 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2560]
Doing FooTask...
Doing FooTask...
Doing FooTask...
Doing FooTask...
Stopping task...
Doing FooTask...
Doing FooTask...

I have found the way to kill Future. But I don't know how to integrate into this architecture. Hope anyone to help me.


Solution

  • Since my original answer (breaking up a Future's execution by recursively chaining it with itself) didn't really solve the problem, I did a little research and saw this idea from Dr Roland Kuhn: https://groups.google.com/d/msg/akka-user/nkD5BN17kVk/RpUbm07rvpMJ

    ... you can also spawn a dedicated thread for running that and have an actor manage it; in this case you can call Thread.interrupt or Thread.stop any way you like while keeping the actor responsive.

    I played around with this idea of having an actor manage starting and stopping a thread. Since actors are a good way to manage potentially-expensive resources, an actor is indeed a good candidate to manage a thread. Here's my implementation:

    /** Does the main work (i.e. training ML model). */
    class WorkerThread(task: String, parent: ActorRef) extends Runnable {
      override def run(): Unit = try {
        while (true) {
          println(s"Doing $task...")
          Thread sleep 500
        }
      } catch {
        /* Since this thread may be interrupted at any time, we need to
           gracefully handle being interrupted. Since we have a handle to the
           actor that's managing us, we can send it a message telling it to
           finish up. */
        case _: InterruptedException => parent ! Worker.Message.FinishUp
      }
    }
    
    /** Manages starting and stopping the model training thread. */
    class Worker extends Actor {
      private var thread: Thread = null
    
      override def receive: Receive = {
        case Worker.Message.DoTask(task) =>
          if (thread == null) {
            thread = new Thread(new WorkerThread(task, self))
            thread.start()
          }
        case Worker.Message.StopTask =>
          if (thread != null) thread.interrupt()
        case Worker.Message.FinishUp => println("Stopped task...")
      }
    }
    
    object Worker {
      sealed trait Message
      object Message {
        case class DoTask(task: String) extends Message
        case object StopTask extends Message
        case object FinishUp extends Message
      }
    }
    

    Now, we can stop the ML model training at any time using its managing actor. It turns out a Future was the wrong level of abstraction to use, but a Thread is the right one.