Search code examples
scalaparallel-processingn-queens

Why parallel range processing takes lot more time than Future based parallel processing (N-queens example)?


I came up with two parallel solutions to find as fast as possible one solution for the N queens problem.

The first one uses Futures

import scala.collection.immutable.HashSet
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

/**
  * Created by mikel on 17/06/16.
  */
object Queens2 extends App {
  val time = System.currentTimeMillis()
  val boardSize = 200

  def firstResult(): Future[List[Int]] = {
    def iterate(solution: Vector[(Int, Int)], remainingElements: Set[Int], invalidSum: HashSet[Int], invalidMinus: HashSet[Int]): Stream[List[(Int, Int)]] = {
      def isSafe(queens: Vector[(Int, Int)], queen: Int): Boolean = {
        !invalidSum.contains(queens.size + queen) && !invalidMinus.contains(queens.size - queen)
      }

      if (solution.size == boardSize)
        Stream(solution.toList)
      else {
        for {
          nextQueen <- remainingElements.toStream if isSafe(solution, nextQueen)
          res <- iterate(solution :+(solution.size, nextQueen), remainingElements - nextQueen, invalidSum + (solution.size + nextQueen), invalidMinus + (solution.size - nextQueen))
        } yield (res)
      }
    }

    val promise = Promise[List[Int]]()
    val allElements = (0 until boardSize).toSet

    val range = (0 until boardSize)
    range.foreach(pos => {
      // HERE we parallelize the execution
      Future {
        promise.trySuccess(iterate(Vector((0, pos)), allElements - pos, HashSet(pos), HashSet(-pos)).map(_.map(_._2)).head)
      }
    }
    )

    promise.future
  }

  val resFuture = firstResult()
  resFuture.onSuccess { case res =>

    println("Finished in: " + (System.currentTimeMillis() - time))
    println(res)
    System.exit(0)
  }

  Await.result(Promise().future, Duration.Inf)
}

The other one uses a ParRange

import scala.collection.immutable.HashSet
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
/**
  * Created by mikel on 17/06/16.
  */
object Queens extends App {
  val time = System.currentTimeMillis()
  val boardSize = 200

  def firstResult(): Future[List[Int]] = {
    def iterate(solution: Vector[(Int, Int)], remainingElements: Set[Int], invalidSum: HashSet[Int], invalidMinus: HashSet[Int]): Stream[List[(Int, Int)]] = {
      def isSafe(queens: Vector[(Int, Int)], queen: Int): Boolean = {
        !invalidSum.contains(queens.size + queen) && !invalidMinus.contains(queens.size - queen)
      }

      if (solution.size == boardSize)
        Stream(solution.toList)
      else {
        for {
          nextQueen <- remainingElements.toStream if isSafe(solution, nextQueen)
          res <- iterate(solution :+(solution.size, nextQueen), remainingElements - nextQueen, invalidSum + (solution.size + nextQueen), invalidMinus + (solution.size - nextQueen))
        } yield (res)
      }
    }

    val promise = Promise[List[Int]]()
    Future {
      val allElements = (0 until boardSize).toSet

      // HERE we parallelize the execution
      val range = (0 until boardSize).par
      range.foreach(pos => {
        promise.trySuccess(iterate(Vector((0, pos)), allElements - pos, HashSet(pos), HashSet(-pos)).map(_.map(_._2)).head)
      }
      )
    }
    promise.future
  }

  val resFuture = firstResult()
  resFuture.onSuccess { case res =>

    println("Finished in: " + (System.currentTimeMillis() - time))
    println(res)
    System.exit(0)
  }

  Await.result(Promise().future, Duration.Inf)
}

After executing both programs with a 200 size board I get a much faster solution with the first approach (apparently the level of parallelization goes down in the second solution after some time), anybody knows why is this happening?


Solution

  • So, this fixed version of your second snippet works fast enough:

      import java.util.concurrent.Executors
    
      import scala.collection.immutable.HashSet
      import scala.concurrent.duration._
      import scala.concurrent.{Await, ExecutionContext, Future, Promise}
      /**
        * Created by mikel on 17/06/16.
        */
      object Queens extends App {
        val time = System.currentTimeMillis()
        val boardSize = 200
    
        def firstResult(): Future[List[Int]] = {
          def iterate(solution: Vector[(Int, Int)], remainingElements: Set[Int], invalidSum: HashSet[Int], invalidMinus: HashSet[Int]): Stream[List[(Int, Int)]] = {
            def isSafe(queens: Vector[(Int, Int)], queen: Int): Boolean = {
              !invalidSum.contains(queens.size + queen) && !invalidMinus.contains(queens.size - queen)
            }
    
            if (solution.size == boardSize)
              Stream(solution.toList)
            else {
              for {
                nextQueen <- remainingElements.toStream if isSafe(solution, nextQueen)
                res <- iterate(solution :+(solution.size, nextQueen), remainingElements - nextQueen, invalidSum + (solution.size + nextQueen), invalidMinus + (solution.size - nextQueen))
              } yield (res)
            }
          }
    
          val futureExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(20))
          val promise = Promise[List[Int]]()
          Future({
            val allElements = (0 until boardSize).toSet
    
            // HERE we parallelize the execution
            val range = (0 until boardSize).par
            range.foreach(pos => {
              promise.trySuccess(iterate(Vector((0, pos)), allElements - pos, HashSet(pos), HashSet(-pos)).map(_.map(_._2)).head)
            }
            )
          })(futureExecutor)
          promise.future
        }
    
        val resFuture = firstResult()
        resFuture.onSuccess({ case res =>
    
          println("Finished in: " + (System.currentTimeMillis() - time))
          println(res)
          System.exit(0)
        })(scala.concurrent.ExecutionContext.Implicits.global)
    
    
        Await.result(Promise().future, Duration.Inf)
      }
    

    As you can see, I've introduced separate executors for awaiting result and computing your future. To make it more obvious I made them explicit but of course you may use implicits.

    The problem in your second snippet was thread pool exhausting in default execution context (scala.concurrent.ExecutionContext.Implicits.global), so your promise will not trigger until almost all the computations completed.

    ParRange uses global context through default TaskSupport:

    //...
    private[parallel] def getTaskSupport: TaskSupport = new ExecutionContextTaskSupport
    //...
    class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.ExecutionContext.global)
    extends TaskSupport with ExecutionContextTasks