I am trying to spawn many CPU intensive jobs using Scala Futures. Because there are so many, I need to throttle the creation of these jobs (threads). To do that I use:
import java.util.concurrent.ArrayBlockingQueue
import scala.concurrent._
val numThread = sys.runtime.availableProcessors
import java.util.concurrent.ExecutorService
import java.util.concurrent.ArrayBlockingQueue
implicit val context = ExecutionContext.fromExecutorService(
new ThreadPoolExecutor(
numThread, numThread,
0L, TimeUnit.SECONDS,
new ArrayBlockingQueue[ Runnable ]( numThread ) {
override def offer( e: Runnable ) = {
put( e ); // Waiting for empty room
true
}
})
)
To test this I created 2 very simple functions:
import scala.util.{ Try, Success, Failure }
import scala.util.Random
def longComputation() = {
val id = Thread.currentThread().getId
//blocking {
println( s"Started thread: $id" )
Thread.sleep( 500 )
println( s"Finished thread: $id" )
//}
id
}
def processResult[T](r : Try[T]) = {
blocking {
r match {
case Success( id ) => println( s"Thread result: $id" )
case Failure( t ) => println( "An error has occured: " + t.getMessage )
}
}
}
I then execute the test to execute the tasks via multi-threading:
def main( args: Array[ String ] ) {
val s = Stream.from( 0 )
//s.foreach { x => println(x) ; val f = Future( longComputation ) ; f.onComplete{ processResult } }
s.foreach { x =>
println(x)
val f = Future( longComputation )
val p = Promise[Long]()
p completeWith f
p.future.onComplete{ processResult }
}
println("Finished")
context.shutdown
}
When I executed this, 16 threads were launched (CPU count is 8). The program printed the "Finished" message. The system then locks and nothing else is executed. If however I remove the callback, then the threads are execute ad infinitum as expected.
Above I have experimented with blocking
and also using Promise
. No change in behaviour. So my question is: how can I throttle the task execution without blocking the callbacks? If this is not possible is it viable to do I/O in the threads (Future)?
Appreciate any pointers.
The program is running in a deadlock. The threadPool
provided is of a fixed size, so the following happens:
Future(longComputation)
allocates a thread from the threadpool and starts working. When it's done, the onComplete
allocates a Thread
from the pool to execute the provided function.
Given that doing work takes longer than finishing work, at some point, all threads are busy doing work. Any one of them finishes and the onComplete
needs a thread as well, so it requests the executor for one. Work cannot finish because all threads are busy and the machine stops in deadlock.
We can solve this producer-consumer deadlock by giving reserved resources to the consumer. That way, work is throttled by the fixed-size thread pool, but we ensure that any work that's finished can be further handled.
This snippet, where I've renamed context
as fixedContext
shows the use of a separate context for processing the results, solving the deadlock. I also got rid of the Promise
, that was not playing a real function other than proxying the future.
val fixedContext = // same as in question
val singleThreadContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))
...
...
def main( args: Array[ String ] ) {
val s = Stream.from( 0 )
s.foreach { x =>
println(x)
val f = Future( longComputation )(fixedContext)
f.onComplete{ processResult }(singleThreadContext)
}
println("Finished")
fixedContext.shutdown
}
}