Search code examples
scalaakka-stream

Usage of Keep.right / Keep.left within Akka Streams does not affect the result


I'm trying to come up with different examples in order to understand the work of Keep.left or Keep.right

I have the following code:

val numSource = Source(1 to 10)
  val numSource = Source(1 to 10)
  val incrementFlow = Flow[Int].map(x => x +1)
  val doubleFlow = Flow[Int].map(x => x * 10)

  val someFuture: NotUsed =           numSource.via(incrementFlow).via(doubleFlow).to(Sink.foreach(println)).run
  val someOtherFuture: Future[Done] = numSource.viaMat(incrementFlow)(Keep.left).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()
  val someOtherFuture2: Future[Done] = numSource.viaMat(incrementFlow)(Keep.right).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()

I was assuming that in someOtherFuture viaMat(incrementFlow)(Keep.left) will ignore the increment of the element since I'm using materialized value of the source (and not the resulted one = Keep.right) and the result of the graph will be equal to num * 10. BUT all 3 lines are giving me back the same result: //20,30,40 .. 110

What I'm missing here? I have already checked the documentation and was trying to implement simple examples, but it looks that I got wrong the idea of materializing. Or maybe it happens since I'm working with a sequenced graph w/o any merged flows that receive two inputs?


Solution

  • The materialized value is best understood as a means for code outside of a stream which has started (whether still running or completed/failed) to interact with the stream in some way: it's the return value of running the stream, and we can see the effect of Keep.right/Keep.left in the code you posted:

    val someFuture: NotUsed =           numSource.via(incrementFlow).via(doubleFlow).to(Sink.foreach(println)).run
    val someOtherFuture: Future[Done] = numSource.viaMat(incrementFlow)(Keep.left).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()
    val someOtherFuture2: Future[Done] = numSource.viaMat(incrementFlow)(Keep.right).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()
    

    The materialized value of numSource, incrementFlow, and doubleFlow is NotUsed which basically means that those stages don't expose any way for code outside of the stream to interact with the stream.

    For Sink.foreach, the materialized value is a Future[Done] which will complete successfully with Done if the stream runs until the source has exhausted and all other stages succeeded, or complete with a failure containing the cause of the stream failing.