I have some code that looks roughly like the following: where A is a Tuple of two Maps
def methodName(): Flow[A, B, NotUsed] = {
val filter = Flow[A].map(a => a._2.slice(0, 2))
val split = Flow[A._2]
.mapConcat(identity)
.map(t => {
B.random
})
.fold(B.empty)((a, b) => {
new B(a._1, a._2 ++ Seq(b._1), a._3 ++ Seq(b._2), a._4)
})
val logK = Flow[B].log("K", c => {
log.info("here")
})
filter.via(split).via(logK)
}
But when I run this, the stream stalls at the fold stage and I'm not understanding why. I can confirm that the collection in A._2 is being fully exhausted and when i replace the fold with a different operation, the flow proceeds and isn't blocked. So as far as I know, the completeStage is being called by the upstream mapConcat. So i'm not sure why the fold stage isn't getting that call and knowing to proceed to the next stage.
So it seems that this is a bug in the version of akka I was using: akka: "2.5.23", akkaHttp: "10.1.10"
When i upgraded to akka: "2.6.8" and akkaHttpV = "10.2.0" it all works as expected