Search code examples
scalaakkaakka-stream

Akka stream batching


Learning Akka Streams. I have a stream of records, many per time unit, already ordered by time (from Slick), and I want to batch them into time groups for processing by detecting when the time step changes.

Example

case class Record(time: Int, payload: String)

If the incoming stream is

Record(1, "a")
Record(1, "k")
Record(1, "k")
Record(1, "a")
Record(2, "r")
Record(2, "o")
Record(2, "c")
Record(2, "k")
Record(2, "s")
Record(3, "!")
...

I would like to transform this into

Batch(1, Seq("a","k","k","a"))
Batch(2, Seq("r","o","c","k","s"))
Batch(3, Seq("!"))
...

So far I've only found grouping by a fixed number of records, or splitting into many substreams, but from my perspective I don't need multiple substreams.

Update: I found batch, but it looks more concerned with backpressure than just batching all the time.


Solution

  • statefulMapConcat is the multitool in the Akka Streams library.

    val records =
      Source(List(
        Record(1, "a"),
        Record(1, "k"),
        Record(1, "k"),
        Record(1, "a"),
        Record(2, "r"),
        Record(2, "o"),
        Record(2, "c"),
        Record(2, "k"),
        Record(2, "s"),
        Record(3, "!")
      ))
      .concat(Source.single(Record(0, "notused"))) // needed to print the last element
    
    records
      .statefulMapConcat { () =>
        var currentTime = 0
        var payloads: Seq[String] = Nil
    
        record =>
          if (record.time == currentTime) {
            payloads = payloads :+ record.payload
            Nil
          } else {
            val previousState = (currentTime, payloads)
            currentTime = record.time
            payloads = Seq(record.payload)
            List(previousState)
          }
      }
      .runForeach(println)
    

    Running the above prints the following:

    (0,List())
    (1,List(a, k, k, a))
    (2,List(r, o, c, k, s))
    (3,List(!))
    

    You can adjust the example to print Batch objects.