Search code examples
akka-stream

How to implement stream with skip and conditional stop


I try to implement batch processing. My algo:

1) First I need request items from db, initial skip = 0. If no items then completely stop processing.

  case class Item(i: Int)

  def getItems(skip: Int): Future[Seq[Item]] = {
    Future((skip until (skip + (if (skip < 756) 100 else 0))).map(Item))
  }

2) Then for every item do heavy job (parallelism = 4)

  def heavyJob(item: Item): Future[String] = Future {
    Thread.sleep(1000)
    item.i.toString + " done"
  }

3) After all items processing, go to 1 step with skip += 100

Whats I trying:

val dbSource: Source[List[Item], _] = Source.fromFuture(getItems(0).map(_.toList))

val flattened: Source[Item, _] = dbSource.mapConcat(identity)

val procced: Source[String, _] = flattened.mapAsync(4)(item => heavyJob(item))

procced.runWith(Sink.onComplete(t => println("Complete: " + t.isSuccess)))

But I don't know how to implement pagination


Solution

  • The skip incrementing can be handled with an Iterator as the underlying source of values:

    val skipIncrement = 100
    
    val skipIterator : () => Iterator[Int] = 
      () => Iterator from (0, skipIncrement)
    

    This Iterator can then be used to drive an akka Source which get the items and will continue processing until a query returns an empty Seq:

    val databaseStillHasValues : Seq[Item] => Boolean = 
      (dbValues) => !dbValues.isEmpty
    
    val itemSource : Source[Item, _] = 
      Source.fromIterator(skipIterator)
            .mapAsync(1)(getItems)
            .takeWhile(databaseStillHasValues)
            .mapConcat(identity)
    

    The heavyJob can be used within a Flow:

    val heavyParallelism = 4
    
    val heavyFlow : Flow[Item, String, _] = 
      Flow[Item].mapAsync(heavyParallelism)(heavyJob)
    

    Finally, the Source and Flow can be attached to the Sink:

    val printSink = Sink[String].foreach(t => println(s"Complete: ${t.isSuccess}"))
    
    itemSource.via(heavyFlow)
              .runWith(printSink)