Search code examples
scalaconcurrencyactorsimd

Multiple Scala actors servicing one task


I need to process multiple data values in parallel ("SIMD"). I can use the java.util.concurrent APIs (Executors.newFixedThreadPool()) to process several values in parallels using Future instances:

import java.util.concurrent.{Executors, Callable}

class ExecutorsTest {
  private class Process(value: Int)
      extends Callable[Int] {
    def call(): Int = {
      // Do some time-consuming task
      value
    }
  }

  val executorService = {
    val threads = Runtime.getRuntime.availableProcessors
    Executors.newFixedThreadPool(threads)
  }

  val processes = for (process <- 1 to 1000) yield new Process(process)

  val futures = executorService.invokeAll(processes)

  // Wait for futures
}

How do I do the same thing using Actors? I do not believe that I want to "feed" all of the processes to a single actor because the actor will then execute them sequentially.

Do I need to create multiple "processor" actors with a "dispatcher" actor that sends an equal number of processes to each "processor" actor?


Solution

  • If you just want fire-and-forget processing, why not use Scala futures?

    import scala.actors.Futures._
    def example = {
      val answers = (1 to 4).map(x => future {
        Thread.sleep(x*1000)
        println("Slept for "+x)
        x
      })
      val t0 = System.nanoTime
      awaitAll(1000000,answers: _*)  // Number is timeout in ms
      val t1 = System.nanoTime
      printf("%.3f seconds elapsed\n",(t1-t0)*1e-9)
      answers.map(_()).sum
    }
    
    scala> example
    Slept for 1
    Slept for 2
    Slept for 3
    Slept for 4
    4.000 seconds elapsed
    res1: Int = 10
    

    Basically, all you do is put the code you want inside a future { } block, and it will immediately return a future; apply it to get the answer (it will block until done), or use awaitAll with a timeout to wait until everyone is done.


    Update: As of 2.11, the way to do this is with scala.concurrent.Future. A translation of the above code is:

    import scala.concurrent._
    import duration._
    import ExecutionContext.Implicits.global
    
    def example = {
      val answers = Future.sequence(
        (1 to 4).map(x => Future {
          Thread.sleep(x*1000)
          println("Slept for "+x)
          x
        })
      )
      val t0 = System.nanoTime
      val completed = Await.result(answers, Duration(1000, SECONDS))
      val t1 = System.nanoTime
      printf("%.3f seconds elapsed\n",(t1-t0)*1e-9)
      completed.sum
    }