Search code examples
scalaakkaakka-stream

Multiplex Akka sources/flows based on condition


Is there a way to multiplex two or more Akka sources or flows based on some external condition? It could look like this:

def cond: Boolean = ???

val src1 = Source.fromIterator(i1)
val src2 = Source.fromIterator(i2)
val src3 = Source.mux(src1, src2, cond)

Depending on cond result src3 should contain either items from src1 or items from src2, never both.

I have found what seems to be the opposite operation divertTo. At the same time, none of the fan-in operations seem to support conditional merging.


Solution

  • I would suggest something along the following:

    def mux[T](a: Source[T, Any], b: Source[T, Any])(cond: Int => Boolean): Source[T, Any] = {
      a.map((1, _)).merge(b.map((2, _)))
        .filter(t => cond(t._1))
        .map(_._2)
    }
    

    Simply put, it attaches an identifier to the element emitted by each source (here 1, 2, but could be anything), then filters with the provided cond function to keep only the elements coming from the currently selected source, then maps back to the element.

    I think zip is not a good idea because it "Emits when all of the inputs have an element available", i.e. even if there is an element available from source A and you actually want to switch to source A, zip will wait until there is an element available in B before emitting (ref).

    On the other hand, merge will emit immediately when any source has an available item.