I came up with two parallel solutions to find as fast as possible one solution for the N queens problem.
The first one uses Futures
import scala.collection.immutable.HashSet
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
/**
* Created by mikel on 17/06/16.
*/
object Queens2 extends App {
val time = System.currentTimeMillis()
val boardSize = 200
def firstResult(): Future[List[Int]] = {
def iterate(solution: Vector[(Int, Int)], remainingElements: Set[Int], invalidSum: HashSet[Int], invalidMinus: HashSet[Int]): Stream[List[(Int, Int)]] = {
def isSafe(queens: Vector[(Int, Int)], queen: Int): Boolean = {
!invalidSum.contains(queens.size + queen) && !invalidMinus.contains(queens.size - queen)
}
if (solution.size == boardSize)
Stream(solution.toList)
else {
for {
nextQueen <- remainingElements.toStream if isSafe(solution, nextQueen)
res <- iterate(solution :+(solution.size, nextQueen), remainingElements - nextQueen, invalidSum + (solution.size + nextQueen), invalidMinus + (solution.size - nextQueen))
} yield (res)
}
}
val promise = Promise[List[Int]]()
val allElements = (0 until boardSize).toSet
val range = (0 until boardSize)
range.foreach(pos => {
// HERE we parallelize the execution
Future {
promise.trySuccess(iterate(Vector((0, pos)), allElements - pos, HashSet(pos), HashSet(-pos)).map(_.map(_._2)).head)
}
}
)
promise.future
}
val resFuture = firstResult()
resFuture.onSuccess { case res =>
println("Finished in: " + (System.currentTimeMillis() - time))
println(res)
System.exit(0)
}
Await.result(Promise().future, Duration.Inf)
}
The other one uses a ParRange
import scala.collection.immutable.HashSet
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
/**
* Created by mikel on 17/06/16.
*/
object Queens extends App {
val time = System.currentTimeMillis()
val boardSize = 200
def firstResult(): Future[List[Int]] = {
def iterate(solution: Vector[(Int, Int)], remainingElements: Set[Int], invalidSum: HashSet[Int], invalidMinus: HashSet[Int]): Stream[List[(Int, Int)]] = {
def isSafe(queens: Vector[(Int, Int)], queen: Int): Boolean = {
!invalidSum.contains(queens.size + queen) && !invalidMinus.contains(queens.size - queen)
}
if (solution.size == boardSize)
Stream(solution.toList)
else {
for {
nextQueen <- remainingElements.toStream if isSafe(solution, nextQueen)
res <- iterate(solution :+(solution.size, nextQueen), remainingElements - nextQueen, invalidSum + (solution.size + nextQueen), invalidMinus + (solution.size - nextQueen))
} yield (res)
}
}
val promise = Promise[List[Int]]()
Future {
val allElements = (0 until boardSize).toSet
// HERE we parallelize the execution
val range = (0 until boardSize).par
range.foreach(pos => {
promise.trySuccess(iterate(Vector((0, pos)), allElements - pos, HashSet(pos), HashSet(-pos)).map(_.map(_._2)).head)
}
)
}
promise.future
}
val resFuture = firstResult()
resFuture.onSuccess { case res =>
println("Finished in: " + (System.currentTimeMillis() - time))
println(res)
System.exit(0)
}
Await.result(Promise().future, Duration.Inf)
}
After executing both programs with a 200 size board I get a much faster solution with the first approach (apparently the level of parallelization goes down in the second solution after some time), anybody knows why is this happening?
So, this fixed version of your second snippet works fast enough:
import java.util.concurrent.Executors
import scala.collection.immutable.HashSet
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
/**
* Created by mikel on 17/06/16.
*/
object Queens extends App {
val time = System.currentTimeMillis()
val boardSize = 200
def firstResult(): Future[List[Int]] = {
def iterate(solution: Vector[(Int, Int)], remainingElements: Set[Int], invalidSum: HashSet[Int], invalidMinus: HashSet[Int]): Stream[List[(Int, Int)]] = {
def isSafe(queens: Vector[(Int, Int)], queen: Int): Boolean = {
!invalidSum.contains(queens.size + queen) && !invalidMinus.contains(queens.size - queen)
}
if (solution.size == boardSize)
Stream(solution.toList)
else {
for {
nextQueen <- remainingElements.toStream if isSafe(solution, nextQueen)
res <- iterate(solution :+(solution.size, nextQueen), remainingElements - nextQueen, invalidSum + (solution.size + nextQueen), invalidMinus + (solution.size - nextQueen))
} yield (res)
}
}
val futureExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(20))
val promise = Promise[List[Int]]()
Future({
val allElements = (0 until boardSize).toSet
// HERE we parallelize the execution
val range = (0 until boardSize).par
range.foreach(pos => {
promise.trySuccess(iterate(Vector((0, pos)), allElements - pos, HashSet(pos), HashSet(-pos)).map(_.map(_._2)).head)
}
)
})(futureExecutor)
promise.future
}
val resFuture = firstResult()
resFuture.onSuccess({ case res =>
println("Finished in: " + (System.currentTimeMillis() - time))
println(res)
System.exit(0)
})(scala.concurrent.ExecutionContext.Implicits.global)
Await.result(Promise().future, Duration.Inf)
}
As you can see, I've introduced separate executors for awaiting result and computing your future. To make it more obvious I made them explicit but of course you may use implicits.
The problem in your second snippet was thread pool exhausting in default execution context (scala.concurrent.ExecutionContext.Implicits.global
), so your promise will not trigger until almost all the computations completed.
ParRange
uses global context through default TaskSupport
:
//...
private[parallel] def getTaskSupport: TaskSupport = new ExecutionContextTaskSupport
//...
class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.ExecutionContext.global)
extends TaskSupport with ExecutionContextTasks