Search code examples
scalaakkaactor

Concurrency within actors using Futures


I am wondering if there is a better way to handle async initialization of values within an Actor. Actors of course are thread safe when inside the actor, but using Futures throws a wrinkle in that (and you have to make sure you don't close over context or sender) Consider the following:

class ExampleActor(ref1: ActorRef, ref2: ActorRef) extends Actor {

  implicit val ec = context.dispatcher

  val promise1 = Promise[Int]
  val promise2 = Promise[Int]

  def receive = {
    case Request1.Response(x) => promise1.success(x)
    case Request2.Response(y) => promise2.success(y)
    case CombinedResponse(x, y) => x + y
  }

  promise1.future foreach { x =>
    promise2.future foreach { y =>
      self ! CombinedResponse(x, y) 
    }
  }

  ref1 ! Request1
  ref2 ! Request2
}

Is there a better/more idiomatic way of handling parallel requests like this?


Solution

  • You actually don't need futures to handle multi-part response:

    var x: Option[Int] = None
    var y: Option[Int] = None
    
    def receive = {
      case Request1.Response(x) => x = Some(x); checkParts
      case Request2.Response(y) => y = Some(y); checkParts
    }
    
    def checkParts = for {
       xx <- x
       yy <- y
    } parent ! xx + yy
    

    By the way, you may use for-comprehension in the same way even with futures.

    More functional way to manage actor's state:

    case class Resp1(x: Int)
    case class Resp2(y: Int)
    case class State(x: Option[Int], y: Option[Int])
    
    class Worker(parent: ActorRef) extends Actor {
      def receive = process(State(None, None))
    
      def process(s: State): Receive = edge(s) andThen { sn => 
        context become process(sn)
        for {
           xx <- sn.x
           yy <- sn.y
        } parent ! xx + yy //action
      }
    
      def edge(s: State): PartialFunction[Any, State] = { //managing state
        case Resp1(x) => s.copy(x = Some(x))
        case Resp2(y) => s.copy(y = Some(y))
      }
    
    }
    

    Reusing the actor instead of creating a future is better because promise.success actually does a non-managable side-effect by submitting task into an executor, so it's not a pure functional way. Actor's state is better, as side-effect inside an actor is always consistent with other actor's - they're applied step-by-step and only in response to some message. So you may see the actor just as fold on infinite collection; state and messages (also infinite) sent by actor may be seen just as fold's accumulator.

    Talking about Akka, its actors have some kind of IoC features like automatical exception handling (through supervising), which isn't available inside future. In your case, you have to introduce an additional composite message to return into the actor's IoC-context. Adding any other action than self ! CombinedResponse(x, y) (which, for example, may be accidentally done by some other developer to implement some workaround) is unsafe.