I'm trying to come up with different examples in order to understand the work of Keep.left or Keep.right
I have the following code:
val numSource = Source(1 to 10)
val numSource = Source(1 to 10)
val incrementFlow = Flow[Int].map(x => x +1)
val doubleFlow = Flow[Int].map(x => x * 10)
val someFuture: NotUsed = numSource.via(incrementFlow).via(doubleFlow).to(Sink.foreach(println)).run
val someOtherFuture: Future[Done] = numSource.viaMat(incrementFlow)(Keep.left).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()
val someOtherFuture2: Future[Done] = numSource.viaMat(incrementFlow)(Keep.right).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()
I was assuming that in someOtherFuture viaMat(incrementFlow)(Keep.left)
will ignore the increment of the element since I'm using materialized value of the source (and not the resulted one = Keep.right) and the result of the graph will be equal to num * 10.
BUT all 3 lines are giving me back the same result:
//20,30,40 .. 110
What I'm missing here? I have already checked the documentation and was trying to implement simple examples, but it looks that I got wrong the idea of materializing. Or maybe it happens since I'm working with a sequenced graph w/o any merged flows that receive two inputs?
The materialized value is best understood as a means for code outside of a stream which has started (whether still running or completed/failed) to interact with the stream in some way: it's the return value of running the stream, and we can see the effect of Keep.right
/Keep.left
in the code you posted:
val someFuture: NotUsed = numSource.via(incrementFlow).via(doubleFlow).to(Sink.foreach(println)).run
val someOtherFuture: Future[Done] = numSource.viaMat(incrementFlow)(Keep.left).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()
val someOtherFuture2: Future[Done] = numSource.viaMat(incrementFlow)(Keep.right).viaMat(doubleFlow)(Keep.right).toMat(Sink.foreach(println))(Keep.right).run()
The materialized value of numSource
, incrementFlow
, and doubleFlow
is NotUsed
which basically means that those stages don't expose any way for code outside of the stream to interact with the stream.
For Sink.foreach
, the materialized value is a Future[Done]
which will complete successfully with Done
if the stream runs until the source has exhausted and all other stages succeeded, or complete with a failure containing the cause of the stream failing.