Search code examples
scaladesign-patternsakkaactor

akka actor: Patterns.pipe for Either


I have a method like this:

def myFuture: Future[Either[MyLeft, MyRight]] = Future {
.
.
.
}

If I want to pipe the result, I use:

Patterns.pipe(myFuture,ec).to(destinationActor)

But I want in case of Left , send result to one actor and in case of Right send result to another actor. pseudo code like this:

MyPatterns.eitherPipe(myFuture,ec).to(leftConsumerActor,rightConsumerActor)

Solution

  • The source code of akka itself is a good hint what should be done. Have a look at akka.pattern.PipeToSupport:

    def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = Actor.noSender): Future[T] = {
      future andThen {
        case Success(r) ⇒ recipient ! r
        case Failure(f) ⇒ recipient ! Status.Failure(f)
      }
    }
    

    So we can basically reuse this approach for our case with dispatching of Either:

    val result: Future[Either[Int, Throwable]] = Future.successful(Left(5))
    result andThen {
      case Success(Left(value)) => leftActor ! value
      case Success(Right(exception)) => rightActor ! exception
      case Failure(exception) => println("Failure")
    }
    

    Achieving desired DSL:

    We can try to achieve your DSL(eitherPipe() and to(...)) like this:

    trait MyEitherPipeSupport extends PipeToSupport {
    
        final class PipeableEitherFuture[L, R](val future: Future[Either[L, R]])(implicit executionContext: ExecutionContext) {
    
          def to(leftRef: ActorRef, rightRef: ActorRef, exceptionRef: ActorRef) = future andThen {
            case Success(Left(value)) ⇒ leftRef ! value
            case Success(Right(exception)) ⇒ rightRef ! exception
            case Failure(exception) ⇒ exceptionRef ! Status.Failure(exception)
          }
        }
    
        implicit def eitherPipe[L, R](future: Future[Either[L, R]])(implicit executionContext: ExecutionContext): PipeableEitherFuture[L, R] = new PipeableEitherFuture(future)
    
      }
    

    Now in your actor you just mix MyEitherPipeSupport in and you can write like this:

        val result: Future[Either[Int, Throwable]] = Future.successful(Left(5))
        eitherPipe(result).to(left, right, anotherOne)