Search code examples
scalaconcurrencyparallel-processingenumerator

How to convert Sequence of Future tasks to Enumerator, that would consume latest complete tasks


I'm making expensive calculations, that I do in batch, and return chunked HTTP response. Periodically there are some calculations that are "more expensive then the others" and it might cause timeouts on Client.

Currently list of calculations combined in Enumerator like this:

def processBatch(batch: List[B]): Enumerator[A] = {
    val listOfCalculations:List[Future[A]] = batch.map(....)
    val futureList: Future[List[A]] = Future.sequence(listOfFutures)
    Enumerator.flatten(futureList.map(Enumerator.enumerate(_)))
}

With this implementation, Enumerator has input only after Future list is complete. This is not good, because there might be some ready values, waiting for an expensive task to complete.

I wonder if there is a better way to do this - in terms of feeding values to Enumerator as soon as they complete, and not as a batch, on all Futures complition?


Solution

  • Future.sequence means "the list of all these futures when they have completed" so you cannot use that.

    There is a way to get a more imperative enumerator in play.api.libs.iteratee.Concurrent.unicast which you could use, so as soon as they get a result, they push it into the a Channel. This will represent imperative pushing as an enumerator that you can consume with an iteratee.

    This way the first complete result arrives first etc.

    You can find the docs for it here