Search code examples
scalaakkaakka-stream

Does Akka Streams Unzip/Zip preserve order?


If I unzip a series of tuples, perform some asynchronous mutations on the two streams, and then re-zip them, does Akka guarantee the streams are re-zipped in the same order?

Example:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source, Unzip, Zip}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}

val graph: Flow[(Int, String), (Int, String), NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val unzip = builder.add(Unzip[Int, String])
  val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
  val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
  val zip = builder.add(Zip[Int, String])

  unzip.out0 ~> increment ~> zip.in0
  unzip.out1 ~> append ~> zip.in1

  FlowShape(unzip.in, zip.out)
})

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val out = Source(collection.immutable.Seq((0, "a"), (1, "b"), (2, "c")))
  .via(graph)
  .runWith(Sink.seq)

Await.result(out, 1 second)

In this simple test, the output is Vector((1,a-x), (2,b-x), (3,c-x)). So things are looking good. But I'm not sure I can trust this will always be the case.

Something which raises a bit of concern is this:

val unzip = builder.add(Unzip[Int, String])
val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
val filter = builder.add(Flow[Int].filter(_ != 2))
val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
val zip = builder.add(Zip[Int, String])

unzip.out0 ~> increment ~> filter ~> zip.in0
unzip.out1 ~> append ~> zip.in1

// output: Vector((1,a-x), (3,b-x))

Even if the ordering is preserved, there's no guarantee the original tuple relationship will be preserved.

I can manually inspect my flows to make sure there's no filtering logic. But with that done, can I be sure the tuples will be re-zipped in precisely the order they were received?


Solution

  • TL;DR yes, it does. From Stream ordering docs in Akka:

    In Akka Streams almost all computation operators preserve input order of elements. This means that if inputs {IA1,IA2,...,IAn} “cause” outputs {OA1,OA2,...,OAk} and inputs {IB1,IB2,...,IBm} “cause” outputs {OB1,OB2,...,OBl} and all of IAi happened before all IBi then OAi happens before OBi.

    This property is even upheld by async operations such as mapAsync, however an unordered version exists called mapAsyncUnordered which does not preserve this ordering.

    However, in the case of Junctions which handle multiple input streams (e.g. Merge) the output order is, in general, not defined for elements arriving on different input ports. That is a merge-like operation may emit Ai before emitting Bi, and it is up to its internal logic to decide the order of emitted elements. Specialized elements such as Zip however do guarantee their outputs order, as each output element depends on all upstream elements having been signalled already – thus the ordering in the case of zipping is defined by this property.

    If you find yourself in need of fine grained control over order of emitted elements in fan-in scenarios consider using MergePreferred, MergePrioritized or GraphStage – which gives you full control over how the merge is performed.