Search code examples
scalaakkaakka-stream

Default value for MergeLatest


The official documentation of MergeLatest states:

MergeLatest emits list for each element emitted from some input stream, but only after each input stream emitted at least one element.

My question is: can this be bypassed? For example, can we provide a default value such that it will start producing lists as soon as it receives at least one element from any input stream?

The following should be the new behavior:

(1,0,0)
(2,0,0)
(2,1,0)
(2,1,1)
(2,1,2)

Instead of:

(2,1,1)
(2,1,2)

As I need those first lists to be pushed to the output stream as well


Solution

  • Unfortunately, mergeLatest doesn't provide such option. And there doesn't seem to be any Stream operator that readily does that. One way would be to repurpose MergeLatest for the specific need. Good news is that the necessary code change is rather simple as the relevant code implementation is a standard GraphStage of UniformFanInShape.

    import akka.stream.scaladsl._
    import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
    import akka.stream.{ Attributes, Inlet, Outlet, UniformFanInShape }
    import scala.collection.immutable
    
    object MergeLatestWithDefault {
      def apply[T](inputPorts: Int, default: T, eagerComplete: Boolean = false): GraphStage[UniformFanInShape[T, List[T]]] =
        new MergeLatestWithDefault[T, List[T]](inputPorts, default, eagerComplete)(_.toList)
    }
    
    final class MergeLatestWithDefault[T, M](val inputPorts: Int, val default: T, val eagerClose: Boolean)(buildElem: Array[T] => M)
        extends GraphStage[UniformFanInShape[T, M]] {
      require(inputPorts >= 1, "input ports must be >= 1")
    
      val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("MergeLatestWithDefault.in" + i))
      val out: Outlet[M] = Outlet[M]("MergeLatestWithDefault.out")
      override val shape: UniformFanInShape[T, M] = UniformFanInShape(out, in: _*)
    
      override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
        new GraphStageLogic(shape) with OutHandler {
          private val activeStreams: java.util.HashSet[Int] = new java.util.HashSet[Int]()
          private var runningUpstreams: Int = inputPorts
          private def upstreamsClosed: Boolean = runningUpstreams == 0
          private val messages: Array[Any] = Array.fill[Any](inputPorts)(default)
    
          override def preStart(): Unit = in.foreach(tryPull)
    
          in.zipWithIndex.foreach {
            case (input, index) =>
              setHandler(
                input,
                new InHandler {
                  override def onPush(): Unit = {
                    messages.update(index, grab(input))
                    activeStreams.add(index)
                    emit(out, buildElem(messages.asInstanceOf[Array[T]]))
                    tryPull(input)
                  }
    
                  override def onUpstreamFinish(): Unit = {
                    if (!eagerClose) {
                      runningUpstreams -= 1
                      if (upstreamsClosed) completeStage()
                    } else completeStage()
                  }
                })
          }
    
          override def onPull(): Unit = {
            var i = 0
            while (i < inputPorts) {
              if (!hasBeenPulled(in(i))) tryPull(in(i))
              i += 1
            }
          }
    
          setHandler(out, this)
        }
    
      override def toString = "MergeLatestWithDefault"
    }
    

    Only little code change is necessary in this case. Besides an additional parameter for default to be filled in array messages, the only change is that emit within onPush is no longer conditional.

    Testing it out:

    import akka.actor.ActorSystem
    
    object CustomMerge {
    
      def main(args: Array[String]): Unit = {
    
        implicit val system = ActorSystem("system")
    
        val s1 = Source(1 to 3)
        val s2 = Source(11 to 13).throttle(1, 50.millis)
        val s3 = Source(101 to 103).throttle(1, 100.millis)
    
        Source.combine(s1, s2, s3)(MergeLatestWithDefault[Int](_, 0)).runForeach(println)
      }
    }
    
    // Output:
    //
    // List(1, 0, 0)
    // List(1, 11, 0)
    // List(1, 11, 101)
    // List(2, 11, 101)
    // List(2, 12, 101)
    // List(3, 12, 101)
    // List(3, 13, 101)
    // List(3, 13, 102)
    // List(3, 13, 103)
    

    As a bonus, while mergeLatest is available only on Akka Stream 2.6+, this repurposed code appears to work fine on 2.5 based on my brief testing.