Search code examples
scalajvmakkaactorobject-pooling

One-off object pooling with Actor provider


I have an object with heavy initialization cost and memory footprint. Initialization time is human-noticeable but creation frequency is low.

class HeavyClass {
  heavyInit()
}

My solution is to create a Provider actor that would have a single object created ahead of time and provide it instantly on request. The provider would then go on with creating the next object.

class HeavyClassProvider extends Actor {

  var hc: Option[HeavyClass] = Some(new HeavyClass())

  override def receive = {
    case "REQUEST" =>
      sender ! { hc getOrElse new HeavyClass() }
      self ! "RESPAWN"
      hc = None

    case "RESPAWN" if (hc == None) => hc = Some(new HeavyClass())
  }

}

And a consumer:

abstract class HeavyClassConsumer extends Actor {

  import context.dispatcher

  import akka.pattern.ask
  import scala.concurrent.duration._
  import akka.util.Timeout

  implicit val timeout = Timeout(5, SECONDS)

  var provider: ActorRef
  var hc: Option[HeavyClass] = None

  override def receive = {
    case "START" =>
      ((provider ask "REQUEST").mapTo[HeavyClass]
       onSuccess { case h: HeavyClass => hc = Some(h) })
  }

}

Is this a common pattern ? The code feels wacky, is there an obvious cleaner way of doing this ?


Solution

  • The problem with your solution is that when you call new HeavyClass() your actor will block until it will process that computation. Doing it in a Future or in another Actor avoids that. Here is one way to do it:

    import akka.pattern.pipe
    ...
    
    class HeavyClassProvider extends Actor {
    
      // start off async computation during init:
      var hc: Future[HeavyClass] = Future(new HeavyClass)
    
      override def receive = {
        case "REQUEST" =>
          // send result to requester when it's complete or
          // immediately if its already complete:
          hc pipeTo sender
          // start a new computation and send to self:
          Future(new HeavyClass) pipeTo self
        case result: HeavyClass => // new result is ready
          hc = Future.successful(result) // update with newly computed result
        case Status.Failure(f) => // computation failed
          hc = Future.failed[HeavyClass](f)
          // maybe request a recomputation again
      }
    }
    

    (I didn't compile it)

    One particularity about my first solution is that it does not restrict how many Futures are computed at the same time. If you receive multiple requests it will compute multiple futures which might not be desirable, although there is no race condition in this Actor. To restrict that simply introduce a Boolean flag in the Actor that tells you if you are computing something already. Also, all these vars can be replaced with become/unbecome behaviors.

    Example of a single concurrent Future computation given multiple requests:

    import akka.pattern.pipe
    ...
    
    class HeavyClassProvider extends Actor {
    
      // start off async computation during init:
      var hc: Future[HeavyClass] = Future(new HeavyClass) pipeTo self
      var computing: Boolean = true
    
      override def receive = {
        case "REQUEST" =>
          // send result to requester when it's complete or
          // immediately if its already complete:
          hc pipeTo sender
          // start a new computation and send to self:
          if(! computing)
            Future(new HeavyClass) pipeTo self
        case result: HeavyClass => // new result is ready
          hc = Future.successful(result) // update with newly computed result
          computing = false
        case Status.Failure(f) => // computation failed
          hc = Future.failed[HeavyClass](f)
          computing = false
          // maybe request a recomputation again
      }
    }
    

    EDIT: After discussing requirements further in the comments here is yet another implementation that sends a new object to the sender/client on each request in non-blocking manner:

    import akka.pattern.pipe
    ...
    
    class HeavyClassProvider extends Actor {
      override def receive = {
        case "REQUEST" =>
          Future(new HeavyClass) pipeTo sender
      }
    }
    

    And then it can be simplified to:

    object SomeFactoryObject {
      def computeLongOp: Future[HeavyClass] = Future(new HeavyClass)
    }
    

    In this case no actors are needed. The purpose of using an Actor in these cases as a synchronization mechanism and non-blocking computation is for that Actor to cache results and provide async computation with more complex logic than just Future, otherwise Future is sufficient.