Search code examples
scalaakkaakka-stream

Equivalent to balancer, broadcast and merge in pure akka streams


In akka streams, using graph dsl builder i can use balancer, broadcast and merger operators:

Flow.fromGraph(GraphDSL.create() { implicit builder =>
 val balancer = builder.add(Balance[Result1](2))
 val merger = builder.add(Merge[Result2](2))

 balancer.out(0) ~> step1.async ~> step2.async ~> merger.in(0)
 balancer.out(1) ~> step1.async ~> step2.async ~> merger.in(1)

 FlowShape(balancer.in, merger.out)
}

How i can achieve the same logic using plain Source, Sink and Flow api?

I can do something like this:

source.mapAsync(2)(Future(...))

But, as i see, semanticlly it is not fully equivalent to the first example.


Solution

  • Use Source.combine and Sink.combine. From the documentation:

    There is a simplified API you can use to combine sources and sinks with junctions like: Broadcast[T], Balance[T], Merge[In] and Concat[A] without the need for using the Graph DSL. The combine method takes care of constructing the necessary graph underneath. In following example we combine two sources into one (fan-in):

    val sourceOne = Source(List(1))
    val sourceTwo = Source(List(2))
    val merged = Source.combine(sourceOne, sourceTwo)(Merge(_))
    
    val mergedResult: Future[Int] = merged.runWith(Sink.fold(0)(_ + _))
    

    The same can be done for a Sink[T] but in this case it will be fan-out:

    val sendRmotely = Sink.actorRef(actorRef, "Done")
    val localProcessing = Sink.foreach[Int](_ => /* do something useful */ ())
    
    val sink = Sink.combine(sendRmotely, localProcessing)(Broadcast[Int](_))
    
    Source(List(0, 1, 2)).runWith(sink)