There is a stateful flow:
val stream = Flow[Event].statefulMapConcat {
() =>
val state = ...
{
element =>
// change the state
element :: Nil
}
}
and it is a part of the flow
Flow[Event]
.groupBy(1000000, event => event.key2, allowClosedSubstreamRecreation = true)
.via(stream)
.mergeSubstreams
Is there any way to have a state
in stream
per substream (in this example per key after the groupBy)?
I think it should be materialised per sub-stream, but don't know how to do that.
You do get a state per substream in that setup:
val stream = Flow[Int].statefulMapConcat {
() => {
var state: List[Int] = Nil
element => {
state = element :: state
List(state)
}
}
}
val groupByFlow =
Flow[Int]
.groupBy(1000000, identity, allowClosedSubstreamRecreation = true)
.via(stream)
.mergeSubstreams
Source(List(1,1,2,3,3,3))
.via(groupByFlow)
.runForeach(i => println(i))
will print
List(1)
List(1, 1)
List(3)
List(2)
List(3, 3)
List(3, 3, 3)