How to implement stream with skip and conditional stop

I try to implement batch processing. My algo:

1) First I need request items from db, initial skip = 0. If no items then completely stop processing.

  case class Item(i: Int)

  def getItems(skip: Int): Future[Seq[Item]] = {
    Future((skip until (skip + (if (skip < 756) 100 else 0))).map(Item))

2) Then for every item do heavy job (parallelism = 4)

  def heavyJob(item: Item): Future[String] = Future {
    item.i.toString + " done"

3) After all items processing, go to 1 step with skip += 100

Whats I trying:

val dbSource: Source[List[Item], _] = Source.fromFuture(getItems(0).map(_.toList))

val flattened: Source[Item, _] = dbSource.mapConcat(identity)

val procced: Source[String, _] = flattened.mapAsync(4)(item => heavyJob(item))

procced.runWith(Sink.onComplete(t => println("Complete: " + t.isSuccess)))

But I don't know how to implement pagination


  • The skip incrementing can be handled with an Iterator as the underlying source of values:

    val skipIncrement = 100
    val skipIterator : () => Iterator[Int] = 
      () => Iterator from (0, skipIncrement)

    This Iterator can then be used to drive an akka Source which get the items and will continue processing until a query returns an empty Seq:

    val databaseStillHasValues : Seq[Item] => Boolean = 
      (dbValues) => !dbValues.isEmpty
    val itemSource : Source[Item, _] = 

    The heavyJob can be used within a Flow:

    val heavyParallelism = 4
    val heavyFlow : Flow[Item, String, _] = 

    Finally, the Source and Flow can be attached to the Sink:

    val printSink = Sink[String].foreach(t => println(s"Complete: ${t.isSuccess}"))