I'm trying to build a graph that has a feedback loop for retries, but when running it the execution stops when encountering a first failure
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder ⇒
import GraphDSL.Implicits._
val in = Source(1 to 10).buffer(1, OverflowStrategy.fail)
val out = Sink.foreach(println)
// purely for logging purposes
val m2 = builder.add(Flow[Either[Int, Int]].map(i => {println("flow:" + i); i}))
val mapper = builder.add(Flow[Int].statefulMapConcat{ () =>
var retries = 0
i =>
// deliberately fails to simulate failures
// supposed to retry that element
if (i == 3) {
if (retries > 0){
List(Right(i))
} else {
retries += 1
List(Left(i))
}
} else {
List(Right(i))
}
})
val concat = builder.add(Concat[Int](2))
val broadcast = builder.add(Broadcast[Either[Int, Int]](2))
in ~> concat.in(0)
concat.out ~> mapper ~> m2 ~> broadcast
broadcast.out(1).filter(_.isRight).map(_.right.get) ~> out
broadcast.out(0).filter(_.isLeft).map(_.left.get) ~> concat.in(1)
ClosedShape
})
graph.run()
mapper
stateful flow - is supposed to do some arbitrary calculation and to track the retries, outputs Either instances, Right in case of success and Left in case of failures
Later I'm trying to redirect the failures back to the concat
for retries and successes to the sink.
the output looks like this:
flow:Right(1)
1
flow:Right(2)
2
flow:Left(3)
I haven't figured out why it doesn't work with Broadcast
, but here is a working version with Partition
:
val partition = builder.add(Partition[Either[Int, Int]](2, input => if (input.isRight) 0 else 1))
concat.out ~> mapper ~> m2 ~> partition.in
partition.out(0).map(_.right.get) ~> out
partition.out(1).map(_.left.get ) ~> concat.in(1)