Search code examples
scalaparallel-processingexecutorservicescalameter

Testing performance of parallel actions with ScalaMeter


I have used the basic example taken from ScalaMeter's example repository (https://github.com/scalameter/scalameter-examples). The code of the example is as follows:

measure method "map" in {
  using(ranges) in { r =>
    r.map(_ + 1)
  }
}

In my case I want to test the speedup of running some actions in parallel. A simple example could be, that I divide a range that has to be mapped. I used ExecutorService from Java8 to run the tasks in parallel. It looks like this:

val cores = Runtime.getRuntime.availableProcessors()
val pool = Executors.newFixedThreadPool(cores)

measure method "parallel map" in {
  using(ranges) in { r =>
    val tasks = (0 until cores).map { t =>
      new Callable[Unit] {
        override def call() = (0 + t until r.last by cores).map(_ + 1)
      }
    }
    import collection.JavaConverters._
    pool.invokeAll(tasks.asJava)
  }
}

The problem is that although the parallel test finishes (you can see the time results) it does not return the exit code. The implication is that if I change the Bench.LocalTime to Bench.ForkedTime even the results are gone. I am quite confused what is going on. Any ideas?


Solution

  • OK, this was pretty trivial, because I have forgotten to shutdown() the pool. After adding it after invokeAll I have a result like below:

    measure method "parallel map" in {
      using(ranges) in { r =>
        val pool = Executors.newFixedThreadPool(cores)
        val tasks = (0 until cores).map { t =>
          new Callable[Unit] {
            override def call() = (0 + t until r.last by cores).map(_ + 1)
          }
        }
        pool.invokeAll(tasks.asJava)
        pool.shutdown()
      }
    }
    

    The only problem is that now, not only the action is measured but also a creation time of the ExecutorService and shutting it down. But I guess it won't change the results much for the time.


    Actually after some time I have found an easier, more 'Scala' way to do the above. You can simply create a list of tasks as a list of functions to make (or can still be a list of Callables) and later invoke all of the tasks using parallel collections. Code looks as follows:

    measure method "parallel map" in {
      using(ranges) in { r =>
        val tasks =  (0 until cores).flatMap { t =>
            (0 + t until r by cores).map(i => () => i+1)
        }
        tasks.par.map(_.apply())
      }
    }
    

    or even easier, because the list of tasks does not care about the cores now:

    val tasks = (0 until r).map(i => () => i+1)