Could you guys explain how to use new groupBy
in akka-streams ? Documentation seems to be quite useless. groupBy
used to return (T, Source)
but not anymore. Here is my example (I mimicked one from docs):
Source(List(
1 -> "1a", 1 -> "1b", 1 -> "1c",
2 -> "2a", 2 -> "2b",
3 -> "3a", 3 -> "3b", 3 -> "3c",
4 -> "4a",
5 -> "5a", 5 -> "5b", 5 -> "5c",
6 -> "6a", 6 -> "6b",
7 -> "7a",
8 -> "8a", 8 -> "8b",
9 -> "9a", 9 -> "9b",
))
.groupBy(3, _._1)
.map { case (aid, raw) =>
aid -> List(raw)
}
.reduce[(Int, List[String])] { case (l: (Int, List[String]), r: (Int, List[String])) =>
(l._1, l._2 ::: r._2)
}
.mergeSubstreams
.runForeach { case (aid: Int, items: List[String]) =>
println(s"$aid - ${items.length}")
}
This simply hangs. Perhaps it hangs because number of substreams is lower than number of unique keys. But what should I do if I have infinite stream ? I'd like to group until key changes.
In my real stream data is always sorted by value I'm grouping by. Perhaps I don't need groupBy
at all ?
I ended up implementing custom stage
class GroupAfterKeyChangeStage[K, T](keyForItem: T ⇒ K, maxBufferSize: Int) extends GraphStage[FlowShape[T, List[T]]] {
private val in = Inlet[T]("GroupAfterKeyChangeStage.in")
private val out = Outlet[List[T]]("GroupAfterKeyChangeStage.out")
override val shape: FlowShape[T, List[T]] =
FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private val buffer = new ListBuffer[T]
private var currentKey: Option[K] = None
// InHandler
override def onPush(): Unit = {
val nextItem = grab(in)
val nextItemKey = keyForItem(nextItem)
if (currentKey.forall(_ == nextItemKey)) {
if (currentKey.isEmpty)
currentKey = Some(nextItemKey)
if (buffer.size == maxBufferSize)
failStage(new RuntimeException(s"Maximum buffer size is exceeded on key $nextItemKey"))
else {
buffer += nextItem
pull(in)
}
} else {
val result = buffer.result()
buffer.clear()
buffer += nextItem
currentKey = Some(nextItemKey)
push(out, result)
}
}
// OutHandler
override def onPull(): Unit = {
if (isClosed(in))
failStage(new RuntimeException("Upstream finished but there was a truncated final frame in the buffer"))
else
pull(in)
}
// InHandler
override def onUpstreamFinish(): Unit = {
val result = buffer.result()
if (result.nonEmpty) {
emit(out, result)
completeStage()
} else
completeStage()
// else swallow the termination and wait for pull
}
override def postStop(): Unit = {
buffer.clear()
}
setHandlers(in, out, this)
}
}
If you don't want to copy-paste it I've added it to helper library that I maintain. In order to use you need to add
Resolver.bintrayRepo("cppexpert", "maven")
to your resolvers. Add add foolowingto your dependencies
"com.walkmind" %% "scala-tricks" % "2.15"
It's implemented in com.walkmind.akkastream.FlowExt
as flow
def groupSortedByKey[K, T](keyForItem: T ⇒ K, maxBufferSize: Int): Flow[T, List[T], NotUsed]
For my example it would be
source
.via(FlowExt.groupSortedByKey(_._1, 128))