Search code examples
akka-stream

Akka streams feedback loop in a graph is not working


I'm trying to build a graph that has a feedback loop for retries, but when running it the execution stops when encountering a first failure

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder ⇒
import GraphDSL.Implicits._

val in = Source(1 to 10).buffer(1, OverflowStrategy.fail)
val out = Sink.foreach(println)

// purely for logging purposes
val m2 = builder.add(Flow[Either[Int, Int]].map(i => {println("flow:" + i); i}))


val mapper = builder.add(Flow[Int].statefulMapConcat{ () =>
  var retries = 0
  i =>

  // deliberately fails to simulate failures
  // supposed to retry that element
  if (i == 3) {
    if (retries > 0){
      List(Right(i))
    } else {
      retries += 1
      List(Left(i))
    }
  } else {
    List(Right(i))
  }
})

val concat = builder.add(Concat[Int](2))

val broadcast = builder.add(Broadcast[Either[Int, Int]](2))


in ~> concat.in(0)

concat.out ~> mapper ~> m2 ~> broadcast

broadcast.out(1).filter(_.isRight).map(_.right.get) ~> out
broadcast.out(0).filter(_.isLeft).map(_.left.get) ~> concat.in(1)


ClosedShape
})

graph.run()

mapper stateful flow - is supposed to do some arbitrary calculation and to track the retries, outputs Either instances, Right in case of success and Left in case of failures

Later I'm trying to redirect the failures back to the concat for retries and successes to the sink.

the output looks like this:

flow:Right(1)
1
flow:Right(2)
2
flow:Left(3)

Solution

  • I haven't figured out why it doesn't work with Broadcast, but here is a working version with Partition:

    val partition = builder.add(Partition[Either[Int, Int]](2, input => if (input.isRight) 0 else 1))
    concat.out ~> mapper ~> m2 ~> partition.in
    partition.out(0).map(_.right.get) ~> out
    partition.out(1).map(_.left.get ) ~> concat.in(1)