Search code examples
scalaconcurrencyjava.util.concurrent

How can this be done concurrently in scala


So I have this chunk of code

dbs.foreach({
  var map = scala.collection.mutable.Map[String, mutable.MutableList[String]]()
  db =>
    val resultList = getTables(hive, db)
    map+=(db -> resultList)
})

What this does is loops through a list of dbs, does a show tables in db call for each db, then adds the db -> table to a map. How can this be done concurrently since there is about a 5 seconds wait time for the hive query to return?

update code --

def getAllTablesConcurrent(hive: JdbcHive, dbs: mutable.MutableList[String]): Map[String, mutable.MutableList[String]] = {
  implicit val context:ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
  val futures = dbs.map {
    db =>
        Future(db, getTables(hive, db))
    }
  val map = Await.result( Future.sequence(futures), Duration(10, TimeUnit.SECONDS) ).toMap
  map
}

Solution

  • if you want more control (how much time do you want to wait, how many threads do you want to use, what happens if all your threads are busy, etc) you can use ThreadPollExecutor and Future

      implicit val context:ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
    
      val dbs = List("db1", "db2", "db3")
    
      val futures = dbs.map {
       name => Future(name, getables(hive, name))
      }
    
      val result = Await.result( Future.sequence(futures), Duration(TIMEOUT, TimeUnit.MILLISECONDS) ).toMap
    

    just remember not to create a new ExecutionContext every time you need it