Search code examples
scalaakkaakka-stream

akka-streams stateful substream flow


There is a stateful flow:

val stream = Flow[Event].statefulMapConcat {
  () =>

    val state = ...

    {
      element =>
        // change the state
        element :: Nil
    }
}

and it is a part of the flow

Flow[Event]
  .groupBy(1000000, event => event.key2, allowClosedSubstreamRecreation = true)
  .via(stream)
  .mergeSubstreams

Is there any way to have a state in stream per substream (in this example per key after the groupBy)? I think it should be materialised per sub-stream, but don't know how to do that.


Solution

  • You do get a state per substream in that setup:

      val stream = Flow[Int].statefulMapConcat {
        () => {
    
          var state: List[Int] = Nil
    
          element => {
            state = element :: state
            List(state)
          }
        }
      }
    
      val groupByFlow =
      Flow[Int]
        .groupBy(1000000, identity, allowClosedSubstreamRecreation = true)
        .via(stream)
        .mergeSubstreams
    
      Source(List(1,1,2,3,3,3))
        .via(groupByFlow)
        .runForeach(i => println(i))
    

    will print

    List(1)
    List(1, 1)
    List(3)
    List(2)
    List(3, 3)
    List(3, 3, 3)