Search code examples
scalaakka-stream

Akka flow using fold not completing


I have some code that looks roughly like the following: where A is a Tuple of two Maps

def methodName(): Flow[A, B, NotUsed] = {
  val filter = Flow[A].map(a => a._2.slice(0, 2))
  val split = Flow[A._2]
    .mapConcat(identity)
    .map(t => {
      B.random
    })
    .fold(B.empty)((a, b) => {
      new B(a._1, a._2 ++ Seq(b._1), a._3 ++ Seq(b._2), a._4)
    })

  val logK = Flow[B].log("K", c => {
    log.info("here")
  })
  filter.via(split).via(logK)
}

But when I run this, the stream stalls at the fold stage and I'm not understanding why. I can confirm that the collection in A._2 is being fully exhausted and when i replace the fold with a different operation, the flow proceeds and isn't blocked. So as far as I know, the completeStage is being called by the upstream mapConcat. So i'm not sure why the fold stage isn't getting that call and knowing to proceed to the next stage.


Solution

  • So it seems that this is a bug in the version of akka I was using: akka: "2.5.23", akkaHttp: "10.1.10"

    When i upgraded to akka: "2.6.8" and akkaHttpV = "10.2.0" it all works as expected