Search code examples
scalalistelasticsearchscala-collectionsscalatest

Control main thread with multiple Future jobs in scala


def fixture =
    new {

      val xyz = new XYZ(spark)
    }    
val fList: scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]] = scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]]() //mutable List of future means List[Future]

    test("test case") {     
        val tasks = for (i <- 1 to 10) {
          fList ++ scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]](Future {
            println("Executing task " + i )
            val ds = read(fixture.etlSparkLayer,i)           
            ds
          })
        }

        Thread.sleep(1000*4200)
        val futureOfList = Future.sequence(fList)//list of Future job in Future sequence       
        println(Await.ready(futureOfList, Duration.Inf))


        val await_result: Seq[Dataset[Row]] = Await.result(futureOfList, Duration.Inf)
        println("Squares: " + await_result)

        futureOfList.onComplete {
          case Success(x) => println("Success!!! " + x)
          case Failure(ex) => println("Failed !!! " + ex)
        }               
      }

I am executing one test case with sequence of Future List and List have collection of Future.I trying to execute same fuction multiple time parallely by help of using Future in scala.In my system only 4 job start in one time after completion of 4 jobs next 4 job will starting like that complete all the jobs. So how to start more than 4 job at a time and how main Thread will wait to complete all the Future thread ? I tried Await.result and Await.ready but not able to control main thread , for main thread control i m use Thread.sleep concept.this program is for read from RDBMS table and write in Elasticsearch. So how to control main thread main issue?


Solution

  • Assuming that you use the scala.concurrent.ExecutionContext.Implicits.global ExecutionContext you can tune the number of threads as described here:

    https://github.com/scala/scala/blob/2.12.x/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L100

    Specifically the following System Properties: scala.concurrent.context.minThreads, scala.concurrent.context.numThreads. scala.concurrent.context.maxThreads, and scala.concurrent.context.maxExtraThreads

    Otherwise, you can rewrite your code to something like this:

    import scala.collection.immutable
    import scala.concurrent.duration._
    import scala.concurrent._
    import java.util.concurrent.Executors
    
    test("test case") { 
      implicit val ec = ExecutionContext.fromExecutorService(ExecutorService.newFixedThreadPool(NUMBEROFTHREADSYOUWANT))
      val aFuture = Future.traverse(1 to 10) {
        i => Future {
          println("Executing task " + i)
          read(fixture.etlSparkLayer,i) // If this is a blocking operation you may want to consider wrapping it in a `blocking {}`-block.          
        }
      }
      aFuture.onComplete(_ => ec.shutdownNow()) // Only for this test, and to make sure the pool gets cleaned up
      val await_result: immutable.Seq[Dataset[Row]] = Await.result(aFuture, 60.minutes) // Or other timeout
      println("Squares: " + await_result) 
    }