With java I can create an ExecutorCompletionService with an executor and a bunch of tasks. This class arranges that submitted tasks are, upon completion, placed on a queue accessible using take. https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html Does Akka has something similar for managing Futures returned by actors?
This answer is for Scala only. In scala there is sequence
/firstCompletedOf
to compose futures, which returns you new future completing after all
/one
of the underlying futures isCompleted (which is equivalent to examples from CompletionService
's api docs). Such solution is more safe than ecs.take().get()
as there is no blocking if you use onComplete
listener; however, if you still want some blocking waiter - use Await.result
. So, no need for CompletionService
as list of futures is flexible enough and much more safe. Equivalent of first example:
val solvers: List[() => Int] = ...
val futures = solvers.map(s => Future {s()}) //run execution
(Future sequence futures) onComplete { results: Seq[Int] =>
results.map(use)
}
Another example is cancelling the task:
val solvers: List[Future => Int] = ... //some list of functions(tasks), Future is used to check if task was interrupted
val (futures, cancels): solvers.map(cancellableFuture) //see https://stackoverflow.com/questions/16020964/cancellation-with-future-and-promise-in-scala
(Future firstCompletedOf futures) onComplete { result: Int =>
cancels.foreach(_())
use(result)
}
Talking about Java, Akka has adaptation of scala's futures: http://doc.akka.io/docs/akka/snapshot/java/futures.html
If you just want to sequentially process results on their completion, you may use actor for that:
val futures: List[Future]
futures.map(_ pipeTo actor) //actor's mailbox is used as queue
To model completion queue's behavior (which is not recommended):
import scala.concurrent._
import duration._
import scala.concurrent.ExecutionContext.Implicits.global //some execution context
class Queue[T](solvers: Seq[() => T]) extends Iterator[T]{
case class Result(f: Future[Result], r: T)
var futures: Set[Future[Result]] = solvers map {s =>
lazy val f: Future[Result] = Future{Result(f, s())}
f
} toSet
def hasNext() = futures.nonEmpty
def next() = {
val result = Await.result((Future firstCompletedOf futures.toSeq), Duration.Inf)
futures -= result.f
result.r
}
}
scala> val q = new Queue(List(() => 1, () => 2, () => 3, () => 4))
q: Queue[Int] = non-empty iterator
scala> q.next
res14: Int = 2
scala> q.next
res15: Int = 1
scala> q.foreach(println)
4
3