I try to connect multiple Flows in Akka Streams and handle their errors in different ways depending on the Flow. It can be accomplished using sth like that:
Flow[String, Either[ProcessingError, String], NotUsed]
And then divert response to error handler based on Either value.
My problem is, that some Flows return Future[String] instead of String and I don't know how to evaluate it to be able to catch error after each Flow and handle it in custom way.
To turn the Future
into an Either
without failing the stream, you can use
.mapAsync(1){ e =>
val f: Future[T] = ...
f.transformWith(_.toEither)
}
mapAsync
and mapAsyncUnordered
are the idiomatic ways to evaluate futures in Akka Streams. Be aware that a failing future in those will fail the stream, to handle errors in-stream, you need to react "immediately" on the future to convert it into Try
or Either
.