Search code examples
scalaactorfuturecallablecancellation

How to cancel (or just interrupt) a scala.concurrent.Future?


I have a Java library that performs long running blocking operations. The library is designed to respond to user cancellation requests. The library integration point is the Callable interface.

I need to integrate this library into my application from within an Actor. My initial thought was to do something like this:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val callable: java.util.concurrent.Callable[Void] = ???
val fut = Future {
  callable.call()
}
fut.onSuccess {
  case _ => // continue on success path
}
fut.onFailure {
  case throwable => // handle exceptions
}

I think this code will work properly in as much as it will not block the actor. But I don't know how I would provide a way to cancel the operation. Assume that while the callable is processing, the actor receives a message that indicates it should cancel the operation being worked on in the callable, and that the library is responsive to cancellation requests via interrupting the processing thread.

What is the best practice to submit a Callable from within an Actor and sometime later cancel the operation?

UPDATE

To be clear, the library exposes an instance of the java.util.concurrent.Callable interface. Callable in and of itself does not provide a cancel method. But the callable object is implemented in such a way that it is responsive to cancellation due to interrupting the thread. In java, this would be done by submitting the callable to an Executor. This would return a java.util.concurrent.Future. It is this Future object that provides the cancel method.

In Java I would do the following:

ExecutorService executor = ...
Callable c = ...
Future f = executor.submit(c);
...
// do more stuff
...
// If I need to cancel the callable task I just do this:
f.cancel(true);

It seems there is a disconnect between a java.util.concurrent.Future and scala.concurrent.Future. The java version provides a cancel method while the scala one does not.

In Scala I would do this:

// When the Akka Actor receives a message to process a 
// long running/blocking task I would schedule it to run
// on a different thread like this:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val callable: java.util.concurrent.Callable[Void] = ???
val fut = Future {
  callable.call()
}
fut.onSuccess {
  case _ => // continue on success path
}
fut.onFailure {
  case throwable => // handle exceptions
}

// But now, if/when the actor receives a message to cancel
// the task because it is taking too long to finish (even
// though it is running in the background) there is no way
// that I know of to cancel or interrupt the 
// scala.concurrent.Future.

Is there an idiomatic scala approach for cancelling a scala.concurrent.Future?


Solution

  • From what I understood your library is exposing an interface that has call and some cancel method, right? I'm assuming you can just call cancel whenever you want to. An example like the one below should get you started.

    class InterruptableThingy extends Actor {
      implicit val ctx = context.system.dispatchers.lookup("dedicated-dispatcher")      
    
      var counter = 0
      var tasks = Map.empty[Int, HasCancelMethod]
    
      def receive = {
        case "doThing" =>
          counter += 1
          val id = counter
    
          val thing = ???
          Future { thing.call() } onSuccess {} // ...
    
          tasks(id) = thing
          sender() ! id
    
        case Interrupt(id) => 
          tasks(id).cancel()
          tasks -= id
      }
    }
    case class Interrupt(taskId: Int)
    

    Please notice that we're using a dedicated dispatcher for the blocking Futures. This is a very good pattern as you can configure that dedicated dispatcher fittingly to your blocking workloads (and won't eat up resourced in the default dispatcher). Dispatchers are explained in more detail in the docs here: http://doc.akka.io/docs/akka/2.3.3/scala/dispatchers.html