def fixture =
new {
val xyz = new XYZ(spark)
}
val fList: scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]] = scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]]() //mutable List of future means List[Future]
test("test case") {
val tasks = for (i <- 1 to 10) {
fList ++ scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]](Future {
println("Executing task " + i )
val ds = read(fixture.etlSparkLayer,i)
ds
})
}
Thread.sleep(1000*4200)
val futureOfList = Future.sequence(fList)//list of Future job in Future sequence
println(Await.ready(futureOfList, Duration.Inf))
val await_result: Seq[Dataset[Row]] = Await.result(futureOfList, Duration.Inf)
println("Squares: " + await_result)
futureOfList.onComplete {
case Success(x) => println("Success!!! " + x)
case Failure(ex) => println("Failed !!! " + ex)
}
}
I am executing one test case with sequence of Future List and List have collection of Future.I trying to execute same fuction multiple time parallely by help of using Future in scala.In my system only 4 job start in one time after completion of 4 jobs next 4 job will starting like that complete all the jobs. So how to start more than 4 job at a time and how main Thread will wait to complete all the Future thread ? I tried Await.result and Await.ready but not able to control main thread , for main thread control i m use Thread.sleep concept.this program is for read from RDBMS table and write in Elasticsearch. So how to control main thread main issue?
Assuming that you use the scala.concurrent.ExecutionContext.Implicits.global
ExecutionContext
you can tune the number of threads as described here:
Specifically the following System Properties: scala.concurrent.context.minThreads
, scala.concurrent.context.numThreads
. scala.concurrent.context.maxThreads
, and scala.concurrent.context.maxExtraThreads
Otherwise, you can rewrite your code to something like this:
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent._
import java.util.concurrent.Executors
test("test case") {
implicit val ec = ExecutionContext.fromExecutorService(ExecutorService.newFixedThreadPool(NUMBEROFTHREADSYOUWANT))
val aFuture = Future.traverse(1 to 10) {
i => Future {
println("Executing task " + i)
read(fixture.etlSparkLayer,i) // If this is a blocking operation you may want to consider wrapping it in a `blocking {}`-block.
}
}
aFuture.onComplete(_ => ec.shutdownNow()) // Only for this test, and to make sure the pool gets cleaned up
val await_result: immutable.Seq[Dataset[Row]] = Await.result(aFuture, 60.minutes) // Or other timeout
println("Squares: " + await_result)
}