If I unzip a series of tuples, perform some asynchronous mutations on the two streams, and then re-zip them, does Akka guarantee the streams are re-zipped in the same order?
Example:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source, Unzip, Zip}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
val graph: Flow[(Int, String), (Int, String), NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[Int, String])
val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
val zip = builder.add(Zip[Int, String])
unzip.out0 ~> increment ~> zip.in0
unzip.out1 ~> append ~> zip.in1
FlowShape(unzip.in, zip.out)
})
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val out = Source(collection.immutable.Seq((0, "a"), (1, "b"), (2, "c")))
.via(graph)
.runWith(Sink.seq)
Await.result(out, 1 second)
In this simple test, the output is Vector((1,a-x), (2,b-x), (3,c-x))
. So things are looking good. But I'm not sure I can trust this will always be the case.
Something which raises a bit of concern is this:
val unzip = builder.add(Unzip[Int, String])
val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
val filter = builder.add(Flow[Int].filter(_ != 2))
val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
val zip = builder.add(Zip[Int, String])
unzip.out0 ~> increment ~> filter ~> zip.in0
unzip.out1 ~> append ~> zip.in1
// output: Vector((1,a-x), (3,b-x))
Even if the ordering is preserved, there's no guarantee the original tuple relationship will be preserved.
I can manually inspect my flows to make sure there's no filtering logic. But with that done, can I be sure the tuples will be re-zipped in precisely the order they were received?
TL;DR yes, it does. From Stream ordering docs in Akka:
In Akka Streams almost all computation operators preserve input order of elements. This means that if inputs
{IA1,IA2,...,IAn}
“cause” outputs{OA1,OA2,...,OAk}
and inputs{IB1,IB2,...,IBm}
“cause” outputs{OB1,OB2,...,OBl}
and all ofIAi
happened before allIBi
thenOAi
happens beforeOBi
.This property is even upheld by
async
operations such asmapAsync
, however an unordered version exists calledmapAsyncUnordered
which does not preserve this ordering.However, in the case of Junctions which handle multiple input streams (e.g.
Merge
) the output order is, in general, not defined for elements arriving on different input ports. That is a merge-like operation may emitAi
before emittingBi
, and it is up to its internal logic to decide the order of emitted elements. Specialized elements such asZip
however do guarantee their outputs order, as each output element depends on all upstream elements having been signalled already – thus the ordering in the case of zipping is defined by this property.If you find yourself in need of fine grained control over order of emitted elements in fan-in scenarios consider using
MergePreferred
,MergePrioritized
orGraphStage
– which gives you full control over how the merge is performed.