Search code examples
scalaakkaakka-stream

Akka streams, conditional divertLeft


I have a divertLeft function, based on the code here: https://bszwej.medium.com/akka-streams-error-handling-7ff9cc01bc12, which will divert Lefts to the specified sink, and pass Rights onward to the downstream consumer.

def divertLeft(to: Graph[SinkShape[(L, CO)], Mat]): FlowWithContext[I, CI, R, CO, Mat] = {
  flow.via {
    Flow[(Either[L, R], CO)]
      .divertTo(
        Flow[(Either[L, R], CO)]
          .collect { case (Left(element), c) => (element, c) }
          .to(to),
        _._1.isLeft
      )
      .collect { case (Right(element), c) => (element, c) }
  }
}

The actual processing upstream, returns Left(new Exception(...)) in some circumstances, and Left(new Error(...)) in others, and I'd like to handle these differently.

This is rather unlovable, but I'd hoped it might work...

def divertLeftIgnoreError(to: Graph[SinkShape[(L, CO)], Mat]): FlowWithContext[I, CI, R, CO, Mat] = {
  /* As above, but if the Left value is an Error, then ignore it instead of diverting to the given destination */
  flow.via {
    Flow[(Either[L, R], CO)]
      .divertTo(
        Flow[(Either[L, R], CO)]
          .collect { case (Left(element), c) if element.isInstanceOf[Error] =>  (element, c) }
          .to(Sink.ignore),
        _._1.isLeft
      )
      .divertTo(
        Flow[(Either[L, R], CO)]
          .collect { case (Left(element), c) if !element.isInstanceOf[Error] => (element, c) }
          .to(to),
        _._1.isLeft
      )
      .collect { case (Right(element), c) => (element, c) }
  }
}

... it doesn't. All the Lefts seem to be ignored, possibly because the .collect doesn't do what I think it does, and so the messages just fall off the end of the function without being handled. Can you even compose divertTo in that way?

I'd also considered writing a predicate (to pass to divertTo) for "_._1.isLeft and also the contents of that Left is an Exception", but I can't figure out the syntax for that.

Is trying to handle different kinds of Left just fundamentally ill-conceived? If so, what pattern should I be using to handle this?


Solution

  • Your issue is in the predicate function of divertTo as you thought. Any element that match the predicate will be diverted.

    In your case, first divertTo diverts all Left. Then the sink collects only the errors and send them to ignore sink. The other Lefts are filtered out by collect.

    What you want is indeed a predicate more precise, like the one you wrote in the case of collect:

    item => item._1 match {
      case Left(e) if e.isInstanceOf[Error] => true
      case _ => false
    }
    

    (This is only an example of how to write it, there are actually other ways to write it like more inline _._1.left.exists(_.isInstanceOf[Error]), pick the one you prefer).

    Do the same for each predicate and it should work as expected.

    Note that if you're going to ignore the Left(Error(..), you could filter them our first and keep a single divertTo.