Search code examples
akkaakka-stream

How do I group items of sorted stream with SubFlows?


Could you guys explain how to use new groupBy in akka-streams ? Documentation seems to be quite useless. groupBy used to return (T, Source) but not anymore. Here is my example (I mimicked one from docs):

Source(List(
  1 -> "1a", 1 -> "1b", 1 -> "1c",
  2 -> "2a", 2 -> "2b",
  3 -> "3a", 3 -> "3b", 3 -> "3c",
  4 -> "4a",
  5 -> "5a", 5 -> "5b", 5 -> "5c",
  6 -> "6a", 6 -> "6b",
  7 -> "7a",
  8 -> "8a", 8 -> "8b",
  9 -> "9a", 9 -> "9b",
))
  .groupBy(3, _._1)
  .map { case (aid, raw) =>
    aid -> List(raw)
  }
  .reduce[(Int, List[String])] { case (l: (Int, List[String]), r: (Int, List[String])) =>
  (l._1, l._2 ::: r._2)
}
  .mergeSubstreams
  .runForeach { case (aid: Int, items: List[String]) =>
    println(s"$aid - ${items.length}")
  }

This simply hangs. Perhaps it hangs because number of substreams is lower than number of unique keys. But what should I do if I have infinite stream ? I'd like to group until key changes.

In my real stream data is always sorted by value I'm grouping by. Perhaps I don't need groupBy at all ?


Solution

  • I ended up implementing custom stage

    class GroupAfterKeyChangeStage[K, T](keyForItem: T ⇒ K, maxBufferSize: Int) extends GraphStage[FlowShape[T, List[T]]] {
    
      private val in = Inlet[T]("GroupAfterKeyChangeStage.in")
      private val out = Outlet[List[T]]("GroupAfterKeyChangeStage.out")
    
      override val shape: FlowShape[T, List[T]] =
        FlowShape(in, out)
    
      override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
    
        private val buffer = new ListBuffer[T]
        private var currentKey: Option[K] = None
    
        // InHandler
        override def onPush(): Unit = {
          val nextItem = grab(in)
          val nextItemKey = keyForItem(nextItem)
    
          if (currentKey.forall(_ == nextItemKey)) {
            if (currentKey.isEmpty)
              currentKey = Some(nextItemKey)
    
            if (buffer.size == maxBufferSize)
              failStage(new RuntimeException(s"Maximum buffer size is exceeded on key $nextItemKey"))
            else {
              buffer += nextItem
              pull(in)
            }
          } else {
            val result = buffer.result()
            buffer.clear()
            buffer += nextItem
            currentKey = Some(nextItemKey)
            push(out, result)
          }
        }
    
        // OutHandler
        override def onPull(): Unit = {
          if (isClosed(in))
            failStage(new RuntimeException("Upstream finished but there was a truncated final frame in the buffer"))
          else
            pull(in)
        }
    
        // InHandler
        override def onUpstreamFinish(): Unit = {
          val result = buffer.result()
          if (result.nonEmpty) {
            emit(out, result)
            completeStage()
          } else
            completeStage()
    
          // else swallow the termination and wait for pull
        }
    
        override def postStop(): Unit = {
          buffer.clear()
        }
    
        setHandlers(in, out, this)
      }
    }
    

    If you don't want to copy-paste it I've added it to helper library that I maintain. In order to use you need to add

    Resolver.bintrayRepo("cppexpert", "maven")
    

    to your resolvers. Add add foolowingto your dependencies

    "com.walkmind" %% "scala-tricks" % "2.15"
    

    It's implemented in com.walkmind.akkastream.FlowExt as flow

    def groupSortedByKey[K, T](keyForItem: T ⇒ K, maxBufferSize: Int): Flow[T, List[T], NotUsed]
    

    For my example it would be

    source
      .via(FlowExt.groupSortedByKey(_._1, 128))