Search code examples
scalaakkaakka-stream

Akka Source not streaming data?


val pageDataFutures : Seq[Future[PageData]]= ??? //4 api calls each resulting in a future of PageData

  def source : Source[PageData, NotUsed] = Source(
    pageDataFutures.flatMap(future => Await.result(future,atMost)).toList
  )

  source.runForeach(println)

I expected the source to run 'runForEach' as and when each future is completed. But instead all the 4 api calls are made and then data in the source is all printed at once. Shouldn't it be printing the data as and available? I am using Await for each future. So it is guaranteed that before the next future is Awaited upon, the result of the previous future is available and can be used by println


Solution

  • Use mapAsync:

    Pass incoming elements to a function that return a Future result. When the Future arrives the result is passed downstream. Up to n elements can be processed concurrently, but regardless of their completion time the incoming order will be kept when results complete.

    val pageDataFutures: Seq[Future[PageData]] = ???
    
    Source(pageDataFutures)
      .mapAsync(parallelism = 1)(x => x) // or: mapAsync(parallelism = 1)(identity)
      .runForeach(println)