Search code examples
streamakkafuture

Custom handling Future failures in Akka Streams Flow


I try to connect multiple Flows in Akka Streams and handle their errors in different ways depending on the Flow. It can be accomplished using sth like that:

Flow[String, Either[ProcessingError, String], NotUsed]

And then divert response to error handler based on Either value.

My problem is, that some Flows return Future[String] instead of String and I don't know how to evaluate it to be able to catch error after each Flow and handle it in custom way.


Solution

  • To turn the Future into an Either without failing the stream, you can use

    .mapAsync(1){ e => 
      val f: Future[T] = ...
      f.transformWith(_.toEither)
    }
    

    mapAsync and mapAsyncUnordered are the idiomatic ways to evaluate futures in Akka Streams. Be aware that a failing future in those will fail the stream, to handle errors in-stream, you need to react "immediately" on the future to convert it into Try or Either.