I am new to the cats-effect library, and I am running into an issue with parallel execution. I have an application that I think would benefit, but when I test the idea on a toy construct, I can't seem to see a difference in execution time. I feel like I must be missing something obvious to others, so I thought I'd try my luck. In the code below, I have two implementations of summation across sequences of numbers (addInSequence
and addInParallel
), both executed in the the run()
function. When I do run the program, I note that they have virtually identical run times. Am I missing something obvious?
import cats.Monoid
import cats.effect.{ExitCode, IO, IOApp}
import cats.implicits._
import scala.concurrent.duration.{FiniteDuration, TimeUnit}
case class Result[A](value: A, secondsElapsed: Double)
object Result {
def total[A](results: Seq[Result[A]])(implicit mon: Monoid[A]): Result[A] = {
val out: Result[A] = results.foldLeft(Result.empty[A]) {
(out: Result[A], next: Result[A]) =>
val newValue: A = mon.combine(out.value, next.value)
val aggTime: Double = out.secondsElapsed + next.secondsElapsed
Result(newValue, aggTime)
}
out
}
def empty[A](implicit mon: Monoid[A]): Result[A] = Result(mon.empty, 0d)
implicit val intAddMon: Monoid[Int] = new Monoid[Int] {
override def empty: Int = 0
override def combine(x: Int, y: Int): Int = x + y
}
}
object ParallelMap extends IOApp {
def slowAdd(nums: Seq[Int]): Int = nums.foldLeft(0) {
(out: Int, next: Int) =>
val seconds: TimeUnit = java.util.concurrent.TimeUnit.SECONDS
val delay: IO[Unit] = IO.sleep(FiniteDuration(1L, seconds))
delay.unsafeRunSync()
out + next
}
def timeIt[A](op: => A): Result[A] = {
val start: Double = System.nanoTime / 1e9
val out: A = op
val stop: Double = System.nanoTime / 1e9
Result(out, stop - start)
}
def addInSequence(first: Seq[Int], second: Seq[Int], third: Seq[Int]): IO[Result[Int]] = {
val partialSums: Seq[Result[Int]] = Seq(first, second, third).map( ns => timeIt(slowAdd(ns)) )
IO(Result.total(partialSums))
}
def addInParallel(first: Seq[Int], second: Seq[Int], third: Seq[Int]): IO[Result[Int]] = {
val ioSeq: List[IO[Result[Int]]] = List(first, second, third).map( ns => IO(timeIt(slowAdd(ns))) )
val sums: IO[List[Result[Int]]] = ioSeq.parSequence
for {
partialSums <- sums
} yield Result.total(partialSums)
}
override def run(args: List[String]): IO[ExitCode] = {
val nums: Seq[Int] = 1 to 4
val results: IO[Seq[(String, Result[Int])]] = for {
serial <- addInSequence(nums, nums, nums)
parallel <- addInParallel(nums, nums, nums)
} yield Seq(("Serial", serial), ("Parallel", parallel))
val report: IO[Unit] = results.map(println)
report.unsafeRunSync()
IO(ExitCode.Success)
}
}
It seems like I should see a reduction to one third of the runtime, but I must somehow being limiting the ability to execute in parallel. However, the docs do not seem to suggest any additional set up is required, nor have any of the other examples I have encountered. Any ideas would be greatly appreciated.
Two things:
Parallel operations are not guaranteed to be faster always. If your sequential operation is short, then overhead from the dispatch to multiple threads and later gathering all results might be greater than the speedup.
Take a look at what you are measuring. You have one sequential operation that does X amount of work, or 3 operations that do X/3 amount of work. You measure them all and then you compare: time of running X sequentially vs total time of running X/3 amount of work in 3 tasks. If sequential run took about 3 seconds, and each parallel run took about 1 second, by that logic both versions take 3 seconds. Which might be true of we measure CPU usage time, but not quite if we measure time from beginning of all that work to the finish.
If I run your code I get
@ ParallelMap.main(Array[String]())
List((Serial,Result(30,12.058612958004232)), (Parallel,Result(30,12.005087116995128)))
However, if I run this code instead:
object ParallelMap extends IOApp {
def slowAdd(nums: Seq[Int]): Int = nums.foldLeft(0) {
(out: Int, next: Int) =>
val seconds: TimeUnit = java.util.concurrent.TimeUnit.SECONDS
val delay: IO[Unit] = IO.sleep(FiniteDuration(1L, seconds))
delay.unsafeRunSync()
out + next
}
def timeIO[A](op: IO[A]): IO[Result[A]] = for {
start <- IO(System.nanoTime / 1e9)
out <- op
stop = System.nanoTime / 1e9
} yield Result(out, stop - start)
def addInSequence(first: Seq[Int], second: Seq[Int], third: Seq[Int]): IO[Result[Int]] = {
timeIO(IO(List(first, second, third).map(ns => slowAdd(ns)).sum))
}
def addInParallel(first: Seq[Int], second: Seq[Int], third: Seq[Int]): IO[Result[Int]] = {
// I changed is as little as possible so that you would still see
// similarity to your code, but normally I would write
// .parTraverse(f) instead of .map(f).parSequence
timeIO(List(first, second, third).map(ns => IO(slowAdd(ns))).parSequence.map(_.sum))
}
def run(args: List[String]): IO[ExitCode] = {
val nums: Seq[Int] = 1 to 4
val results: IO[Seq[(String, Result[Int])]] = for {
serial <- addInSequence(nums, nums, nums)
parallel <- addInParallel(nums, nums, nums)
} yield Seq(("Serial", serial), ("Parallel", parallel))
val report: IO[Unit] = results.map(println)
report.unsafeRunSync()
IO(ExitCode.Success)
}
}
to measure what I believe you wanted to measure I receive this result:
@ ParallelMap.main(Array[String]())
List((Serial,Result(30,12.006349742005114)), (Parallel,Result(30,4.003020468982868)))
which shows that parallel computation was 3 times as fast as sequential.