Search code examples
javascalaakkaactor

Does Akka has a ExecutorCompletionService equivalent where Futures are queued by their completion time?


With java I can create an ExecutorCompletionService with an executor and a bunch of tasks. This class arranges that submitted tasks are, upon completion, placed on a queue accessible using take. https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html Does Akka has something similar for managing Futures returned by actors?


Solution

  • This answer is for Scala only. In scala there is sequence/firstCompletedOf to compose futures, which returns you new future completing after all/one of the underlying futures isCompleted (which is equivalent to examples from CompletionService's api docs). Such solution is more safe than ecs.take().get() as there is no blocking if you use onComplete listener; however, if you still want some blocking waiter - use Await.result. So, no need for CompletionService as list of futures is flexible enough and much more safe. Equivalent of first example:

      val solvers: List[() => Int] = ...
      val futures = solvers.map(s => Future {s()}) //run execution
      (Future sequence futures) onComplete { results: Seq[Int] =>
          results.map(use)
      }
    

    Another example is cancelling the task:

      val solvers: List[Future => Int] = ... //some list of functions(tasks), Future is used to check if task was interrupted
      val (futures, cancels): solvers.map(cancellableFuture) //see https://stackoverflow.com/questions/16020964/cancellation-with-future-and-promise-in-scala
    
      (Future firstCompletedOf futures) onComplete { result: Int =>
            cancels.foreach(_())
            use(result)
      }
    

    Talking about Java, Akka has adaptation of scala's futures: http://doc.akka.io/docs/akka/snapshot/java/futures.html

    If you just want to sequentially process results on their completion, you may use actor for that:

      val futures: List[Future]
      futures.map(_ pipeTo actor) //actor's mailbox is used as queue
    

    To model completion queue's behavior (which is not recommended):

      import scala.concurrent._
      import duration._
      import scala.concurrent.ExecutionContext.Implicits.global //some execution context
    
      class Queue[T](solvers: Seq[() => T]) extends Iterator[T]{
         case class Result(f: Future[Result], r: T)
         var futures: Set[Future[Result]] = solvers map {s => 
            lazy val f: Future[Result]  = Future{Result(f, s())}
            f
         } toSet
    
         def hasNext() = futures.nonEmpty        
    
         def next() = {         
            val result = Await.result((Future firstCompletedOf futures.toSeq), Duration.Inf)
            futures -= result.f
            result.r
         }
      }
    
    scala> val q = new Queue(List(() => 1, () => 2, () => 3, () => 4))
    q: Queue[Int] = non-empty iterator
    
    scala> q.next
    res14: Int = 2
    
    scala> q.next
    res15: Int = 1
    
    scala> q.foreach(println)
    4
    3