I'm working with Akka Streams and trying to use Flow
stages for the most efficient way to describe an entire graph. In some of the stages I send messages to actors via the ask
pattern.
Of course, when I use the ask
pattern, I need to use mapTo
in order to get the expected type for further processing.
Here is an example:
val runnableGraph = Source.single(CheckEntity(entity))
.map { check =>
(postgresActor ? check)
.mapTo[CheckEntityResult]
.map {
case failure: PostgresFailure => Left(failure.message)
case pEntity: PEntity => Right(check)
}
}
.map {
_.map {
case Left(msg) => Future(Left(msg))
case Right(check) =>
(redisActor ? pEntity)
.mapTo[CheckEntityResult]
.map {
case failure: RedisFailure => Left(failure.message)
case rEntity: REntity => Right(rEntity)
}
}
}
.toMat(Sink.head)(Keep.right)
//The result's type is Future[Future[Either[String, Entity]]]
val futureResult = runnableGraph.run()
How do I get rid of the nested Future
between the stages?
One idea to make it easier to propagate a CheckEntity
element through the stream is to change your CheckEntityResult
class to contain the corresponding CheckEntity
instance. This would look something like this:
abstract class CheckEntityResult(entity: CheckEntity) extends Entity
case class PEntity(entity: CheckEntity) extends CheckEntityResult(entity)
case class PostgresFailure(entity: CheckEntity, message: String) extends CheckEntityResult(entity)
case class REntity(entity: CheckEntity) extends CheckEntityResult(entity)
case class RedisFailure(entity: CheckEntity, message: String) extends CheckEntityResult(entity)
Then, after adjusting your actors to handle these messages, you could use Source # ask
and mapAsync
(adjust the levels of parallelism as needed) to interact with the actors and to avoid the nested Future
in the materialized value:
implicit val askTimeout = Timeout(5.seconds)
val runnableGraph = Source.single(CheckEntity(entity))
.ask[CheckEntityResult](parallelism = 3)(postgresActor)
.map {
case PostgresFailure(_, msg) => msg
case PEntity(e) => e
}
.mapAsync(parallelism = 3) {
case failureMsg: String => Future.successful(failureMsg)
case e: CheckEntity => (redisActor ? e).mapTo[CheckEntityResult]
}
.map {
case failureMsg: String => Left(failureMsg)
case RedisFailure(_, msg) => Left(msg)
case r: REntity => Right(r)
}
.toMat(Sink.head)(Keep.right)
val futureResult = runnableGraph.run() // Future[Either[String, Entity]]