I try to implement batch processing. My algo:
1) First I need request items from db, initial skip = 0
. If no items then completely stop processing.
case class Item(i: Int)
def getItems(skip: Int): Future[Seq[Item]] = {
Future((skip until (skip + (if (skip < 756) 100 else 0))).map(Item))
}
2) Then for every item do heavy job (parallelism = 4
)
def heavyJob(item: Item): Future[String] = Future {
Thread.sleep(1000)
item.i.toString + " done"
}
3) After all items processing, go to 1 step with skip += 100
Whats I trying:
val dbSource: Source[List[Item], _] = Source.fromFuture(getItems(0).map(_.toList))
val flattened: Source[Item, _] = dbSource.mapConcat(identity)
val procced: Source[String, _] = flattened.mapAsync(4)(item => heavyJob(item))
procced.runWith(Sink.onComplete(t => println("Complete: " + t.isSuccess)))
But I don't know how to implement pagination
The skip
incrementing can be handled with an Iterator
as the underlying source of values:
val skipIncrement = 100
val skipIterator : () => Iterator[Int] =
() => Iterator from (0, skipIncrement)
This Iterator can then be used to drive an akka Source
which get the items and will continue processing until a query returns an empty Seq
:
val databaseStillHasValues : Seq[Item] => Boolean =
(dbValues) => !dbValues.isEmpty
val itemSource : Source[Item, _] =
Source.fromIterator(skipIterator)
.mapAsync(1)(getItems)
.takeWhile(databaseStillHasValues)
.mapConcat(identity)
The heavyJob
can be used within a Flow
:
val heavyParallelism = 4
val heavyFlow : Flow[Item, String, _] =
Flow[Item].mapAsync(heavyParallelism)(heavyJob)
Finally, the Source and Flow can be attached to the Sink
:
val printSink = Sink[String].foreach(t => println(s"Complete: ${t.isSuccess}"))
itemSource.via(heavyFlow)
.runWith(printSink)