Search code examples
scalacallbackdeadlockthrottlingconcurrent.futures

Throttling Scala Future blocks when onComplete is used


I am trying to spawn many CPU intensive jobs using Scala Futures. Because there are so many, I need to throttle the creation of these jobs (threads). To do that I use:

import java.util.concurrent.ArrayBlockingQueue
import scala.concurrent._

val numThread = sys.runtime.availableProcessors

import java.util.concurrent.ExecutorService
import java.util.concurrent.ArrayBlockingQueue

implicit val context = ExecutionContext.fromExecutorService(
    new ThreadPoolExecutor(
      numThread, numThread,
      0L, TimeUnit.SECONDS,
      new ArrayBlockingQueue[ Runnable ]( numThread ) {
        override def offer( e: Runnable ) = {
          put( e ); // Waiting for empty room
          true
        }
      })
     )

To test this I created 2 very simple functions:

import scala.util.{ Try, Success, Failure }
import scala.util.Random

def longComputation() = {
  val id = Thread.currentThread().getId
  //blocking {
    println( s"Started thread: $id" )
    Thread.sleep( 500 )
    println( s"Finished thread: $id" )
  //}
  id
}

def processResult[T](r : Try[T]) = {
  blocking {
      r match {
        case Success( id ) => println( s"Thread result: $id" )
        case Failure( t )  => println( "An error has occured: " + t.getMessage )
       }
  }

}

I then execute the test to execute the tasks via multi-threading:

def main( args: Array[ String ] ) {


   val s = Stream.from( 0 )
   //s.foreach { x => println(x) ;  val f = Future( longComputation ) ; f.onComplete{ processResult } }

   s.foreach { x => 
     println(x) 
     val f = Future( longComputation )  
     val p = Promise[Long]()
     p completeWith f
     p.future.onComplete{ processResult } 
   }

   println("Finished")
   context.shutdown
 } 

When I executed this, 16 threads were launched (CPU count is 8). The program printed the "Finished" message. The system then locks and nothing else is executed. If however I remove the callback, then the threads are execute ad infinitum as expected.

Above I have experimented with blocking and also using Promise. No change in behaviour. So my question is: how can I throttle the task execution without blocking the callbacks? If this is not possible is it viable to do I/O in the threads (Future)?

Appreciate any pointers.


Solution

  • The program is running in a deadlock. The threadPool provided is of a fixed size, so the following happens: Future(longComputation) allocates a thread from the threadpool and starts working. When it's done, the onComplete allocates a Thread from the pool to execute the provided function.

    Given that doing work takes longer than finishing work, at some point, all threads are busy doing work. Any one of them finishes and the onComplete needs a thread as well, so it requests the executor for one. Work cannot finish because all threads are busy and the machine stops in deadlock.

    We can solve this producer-consumer deadlock by giving reserved resources to the consumer. That way, work is throttled by the fixed-size thread pool, but we ensure that any work that's finished can be further handled.

    This snippet, where I've renamed context as fixedContext shows the use of a separate context for processing the results, solving the deadlock. I also got rid of the Promise, that was not playing a real function other than proxying the future.

    val fixedContext = // same as in question
    val singleThreadContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))
    ...
    ...
    def main( args: Array[ String ] ) {
    
       val s = Stream.from( 0 )
    
       s.foreach { x => 
         println(x)
         val f = Future( longComputation )(fixedContext)  
         f.onComplete{ processResult }(singleThreadContext)
       }
    
       println("Finished")
       fixedContext.shutdown
     } 
    }