Search code examples
akka-stream

How to implement pagination with akka-streams


I need to process large file by lines and do some heavy work (on 4 core cpu) on every item, I think code correct:

implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher

val sink = Sink.foreach[String](elem => println("element proceed"))

FileIO.fromPath(Paths.get("file.txt"))
  .via(Framing.delimiter(ByteString("\n"), 64).map(_.utf8String))
  .mapAsync(4)(v =>
    //long op
    Future {
      Thread.sleep(500)
      "updated_" + v
    })
  .to(sink)
  .run()

But I want to have output like:

100 element proceed
200 element proceed
300 element proceed
357 element proceed. done

How to implement it?


Solution

  • You can use Flow.grouped:

    val groupSize = 100
    
    val groupedFlow = Flow[String].grouped(groupSize)
    

    This Flow can now be injected before or after your mapAsync:

    FileIO.fromPath(Paths.get("file.txt"))
          .via(Framing.delimiter(ByteString("\n"), 64).map(_.utf8String))
          .via(groupedFlow)
      ...