Search code examples
scalaakkaakka-stream

Akka Streams Accumulate by Source Single


I am trying to use akka streams to accumulate data and use as batch:

val myFlow: Flow[String, Unit, NotUsed] = Flow[String].collect {
    case record =>
      println(record)
      Future(record)
  }.mapAsync(1)(x => x).groupedWithin(3, 30 seconds)
    .mapAsync(10)(records =>
      someBatchOperation(records))
    )

My expectation from code above was not make any operation until 3 records are ready, or 30 seconds pass. But when I send some request with Source.single("test"), it is processing this record without waiting for others or 30 seconds.

How can I use this flow to wait for other records to came or 30 seconds idle?

Record is coming from an API request one by one and I am trying to accumulate this data in flow like:

Source.single(apiRecord).via(myFlow).runWith(Sink.ignore)

Solution

  • It actually does that. Let's consider the following:

    Source(Stream.from(1)).throttle(1, 400 milli).groupedWithin(3, 1 seconds).runWith(Sink.foreach(i => println(s"Done with ${i} ${System.currentTimeMillis}")))
    

    The output of that line, until I killed the process, was:

    Done with Vector(1, 2, 3) 1599495716345
    Done with Vector(4, 5) 1599495717348
    Done with Vector(6, 7, 8) 1599495718330
    Done with Vector(9, 10) 1599495719350
    Done with Vector(11, 12, 13) 1599495720330
    Done with Vector(14, 15) 1599495721350
    Done with Vector(16, 17, 18) 1599495722328
    Done with Vector(19, 20) 1599495723348
    Done with Vector(21, 22, 23) 1599495724330
    

    As we can see, the time differences between every time we emit 2 elements, to 3 elements, is a bit more than 1 second. That makes sense because after the 1 second delay, it took a bit more to get to the printing line.

    The difference between every time we emit 2 elements, to 3 elements, is less than a second. Because it had enough elements to go on.

    Why didn't it work in your example?

    When you are using Source.single, then the source adds a complete stage to itself. You can see it in the source code of akka. In this case, the groupedWithin flow knows that it won't get any more elements, so it can emit the "test" string. In order to actually test this flow try to create a bigger stream.

    When using Source(1 to 10) it actually translates into Source.Single, which completes the stream as well. We can see that here.