Factorial calculation using Scala actors

How to compute the factorial using Scala actors ?

And would it prove more time efficient compared to for instance

def factorial(n: Int): BigInt = (BigInt(1) to BigInt(n)).par.product

  • Problem

    You have to split up your input in partial products. This partial products can then be calculated in parallel. The partial products are then multiplied to get the final product.

    This can be reduced to a broader class of problems: The so called Parallel prefix calculation. You can read up about it on Wikipedia.

    Short version: When you calculate a*b*c*d with an associative operation _ * _, you can structure the calculation a*(b*(c*d)) or (a*b)*(c*d). With the second approach, you can then calculate a*b and c*d in parallel and then calculate the final result from these partial results. Of course you can do this recursively, when you have a bigger number of input values.



    This sounds a little bit like a homework assignment. So I will provide a solution that has two properties:

    1. It contains a small bug
    2. It shows how to solve parallel prefix in general, without solving the problem directly

    So you can see how the solution should be structured, but no one can use it to cheat on her homework.

    Solution in detail

    First I need a few imports

    import akka.event.Logging import java.util.concurrent.TimeUnit import scala.concurrent.duration.FiniteDuration import

    Then I create some helper classes for the communication between the actors

    case class Calculate[T](values : Seq[T], segment : Int, parallelLimit : Int, fn : (T,T) => T)
    trait CalculateResponse
    case class CalculationResult[T](result : T, index : Int) extends CalculateResponse
    case object Busy extends CalculateResponse

    Instead of telling the receiver you are busy, the actor could also use the stash or implement its own queue for partial results. But in this case I think the sender shoudl decide how much parallel calculations are allowed.

    Now I create the actor:

    class ParallelPrefixActor[T] extends Actor {
      val log = Logging(context.system, this)
      val subCalculation = Props(classOf[ParallelPrefixActor[BigInt]])
      val fanOut = 2
      def receive = waitForCalculation
      def waitForCalculation : Actor.Receive = {
        case c : Calculate[T] =>
          log.debug(s"Start calculation for ${c.values.length} values, segment nr. ${c.index}, from ${c.values.head} to ${c.values.last}")
          if (c.values.length < c.parallelLimit) {
            log.debug("Calculating result direct")
            val result = c.values.reduceLeft(c.fn)
            sender ! CalculationResult(result, c.index)
            val groupSize: Int = Math.max(1, (c.values.length / fanOut) + Math.min(c.values.length % fanOut, 1))
            log.debug(s"Splitting calculation for ${c.values.length} values up to ${fanOut} children, ${groupSize} elements each, limit ${c.parallelLimit}")
            def segments=c.values.grouped(groupSize)
            log.debug("Starting children")
            segments.zipWithIndex.foreach{case (values, index) =>
              context.actorOf(subCalculation) ! c.copy(values = values, index = index)
            val partialResults: Vector[T] =[Vector]
            log.debug(s"Waiting for ${partialResults.length} results (${partialResults.indices})")
            context.become(waitForResults(segments.length, partialResults, c, sender), discardOld = true)
      def waitForResults(outstandingResults : Int, partialResults : Vector[T], originalRequest : Calculate[T], originalSender : ActorRef) : Actor.Receive = {
        case c : Calculate[_] => sender ! Busy
        case r : CalculationResult[T] =>
          log.debug(s"Putting result ${r.result} on position ${r.index} in ${partialResults.length}")
          val updatedResults = partialResults.updated(r.index, r.result)
          log.debug("Killing sub-worker")
          sender ! PoisonPill
          if (outstandingResults==1) {
            log.debug("Calculating result from partial results")
            val result = updatedResults.reduceLeft(originalRequest.fn)
            originalSender ! CalculationResult(result, originalRequest.index)
            context.become(waitForCalculation, discardOld = true)
            log.debug(s"Still waiting for ${outstandingResults-1} results")
            // For fanOut > 2 one could here already combine consecutive partial results
            context.become(waitForResults(outstandingResults-1, updatedResults, originalRequest, originalSender), discardOld = true)


    Using parallel prefix calculation is not optimal. The actors calculating the the product of the bigger numbers will do much more work than the actors calculating the product of the smaller numbers (e.g. when calculating 1 * ... * 100 , it is faster to calculate 1 * ... * 10 than 90 * ... * 100). So it might be a good idea to shuffle the numbers, so big numbers will be mixed with small numbers. This works in this case, because we use an commutative operation. Parallel prefix calculation in general only needs an associative operation to work.


    In theory

    Performance of the actor solution is worse than the "naive" solution (using parallel collections) for small amounts of data. The actor solution will shine, when you make complex calculations or distribute your calculation on specialized hardware (e.g. graphics card or FPGA) or on multiple machines. With the actor you can control, who does which calculation and you can even restart "hanging calculations". This can give a big speed up.

    On a single machine, the actor solution might help when you have a non-uniform memory architecture. You could then organize the actors in a way that pins memory to a certain processor.

    Some measurement

    I did some real performance measurement using a Scala worksheet in IntelliJ IDEA.

    First I set up the actor system:

    // Setup the actor system
    val system = ActorSystem("root")
    // Start one calculation actor
    val calculationStart = Props(classOf[ParallelPrefixActor[BigInt]])
    val calcolon = system.actorOf(calculationStart, "Calcolon-BigInt")
    val inbox = Inbox.create(system)

    Then I defined a helper method to measure time:

    // Helper function to measure time
    def time[A] (id : String)(f: => A) = {
      val start = System.nanoTime()
      val result = f
      val stop = System.nanoTime()
      println(s"""Time for "${id}": ${(stop-start)*1e-6d}ms""")

    And then I did some performance measurement:

    // Test code
    val limit = 10000
    def testRange = (1 to limit).map(BigInt(_))
    time("par product")(testRange.par.product)
    val timeOut = FiniteDuration(240, TimeUnit.SECONDS)
    inbox.send(calcolon, Calculate[BigInt]((1 to limit).map(BigInt(_)), 0, 10, _ * _))
    time("actor product")(inbox.receive(timeOut))
    time("par sum")(testRange.par.sum)
    inbox.send(calcolon, Calculate[BigInt](testRange, 0, 5, _ + _))
    time("actor sum")(inbox.receive(timeOut))

    I got the following results

    > Time for "par product": 134.38289ms
      res0: scala.math.BigInt = 284625968091705451890641321211986889014805140170279923
      Time for "actor product": 1310.217247ms
      res2: Any = CalculationResult(28462596809170545189064132121198688901480514017027
    > Time for "par sum": 6.488620999999999ms
      res3: scala.math.BigInt = 50005000
    > Time for "actor sum": 657.752832ms
      res5: Any = CalculationResult(50005000,0)

    You can easily see that the actor version is much slower than using parallel collections.