Search code examples
scalaakkaakka-stream

How to get rid of "extra" Future in Akka Stream Flow stages?


I'm working with Akka Streams and trying to use Flow stages for the most efficient way to describe an entire graph. In some of the stages I send messages to actors via the ask pattern.

Of course, when I use the ask pattern, I need to use mapTo in order to get the expected type for further processing.

Here is an example:

val runnableGraph = Source.single(CheckEntity(entity))
  .map { check =>
    (postgresActor ? check)
      .mapTo[CheckEntityResult]
      .map {
        case failure: PostgresFailure => Left(failure.message)
        case pEntity: PEntity => Right(check)
    }
  }
  .map {
    _.map {
      case Left(msg) => Future(Left(msg))
      case Right(check) =>
        (redisActor ? pEntity)
          .mapTo[CheckEntityResult]
          .map {
            case failure: RedisFailure => Left(failure.message)
            case rEntity: REntity => Right(rEntity)
          }
    }
  }
  .toMat(Sink.head)(Keep.right)

//The result's type is Future[Future[Either[String, Entity]]]
val futureResult = runnableGraph.run()

How do I get rid of the nested Future between the stages?


Solution

  • One idea to make it easier to propagate a CheckEntity element through the stream is to change your CheckEntityResult class to contain the corresponding CheckEntity instance. This would look something like this:

    abstract class CheckEntityResult(entity: CheckEntity) extends Entity
    
    case class PEntity(entity: CheckEntity) extends CheckEntityResult(entity)
    case class PostgresFailure(entity: CheckEntity, message: String) extends CheckEntityResult(entity)
    
    case class REntity(entity: CheckEntity) extends CheckEntityResult(entity)
    case class RedisFailure(entity: CheckEntity, message: String) extends CheckEntityResult(entity)
    

    Then, after adjusting your actors to handle these messages, you could use Source # ask and mapAsync (adjust the levels of parallelism as needed) to interact with the actors and to avoid the nested Future in the materialized value:

    implicit val askTimeout = Timeout(5.seconds)
    
    val runnableGraph = Source.single(CheckEntity(entity))
      .ask[CheckEntityResult](parallelism = 3)(postgresActor)
      .map {
        case PostgresFailure(_, msg) => msg
        case PEntity(e) => e
      }
      .mapAsync(parallelism = 3) {
        case failureMsg: String => Future.successful(failureMsg)
        case e: CheckEntity => (redisActor ? e).mapTo[CheckEntityResult]
      }
      .map {
        case failureMsg: String => Left(failureMsg)
        case RedisFailure(_, msg) => Left(msg)
        case r: REntity => Right(r)
      }
      .toMat(Sink.head)(Keep.right)
    
    val futureResult = runnableGraph.run() // Future[Either[String, Entity]]