Search code examples
scalajdbcscalikejdbc

Correct way of using multiple ConnectionPool(s)


In my application, I have to interact (read-only) with multiple MySQL DBs one-by-one. For each DB, I need a certain no of connections. Interactions with a DB do not occur in a single stretch: I query the DB, take some time processing the results, again query the DB, again process the result and so on.

Each one of these interactions require multiple connections [I fire multiple queries concurrently], hence I need a ConnectionPool that spawns when I start interacting with the DB and lives until I'm done with all queries to that DB (including the interim time intervals when I'm not querying, only processing the results).


I'm able to successfully create a ConnectionPool with desired no of connections and obtain the implicit session as shown below

def createConnectionPool(poolSize: Int): DBSession = {
 implicit val session: AutoSession.type = AutoSession

 ConnectionPool.singleton(
   url = "myUrl",
   user = "myUser",
   password = "***",
   settings = ConnectionPoolSettings(initialSize = poolSize)
 )

 session
}

I then pass this implicit session throughout the methods where I need to interact with DB. That ways, I'm able to fire poolSize no of queries concurrently using this session. Fair enough.

def methodThatCallsAnotherMethod(implicit session: DBSession): Unit = {
  ...
  methodThatInteractsWithDb
  ...
}

def methodThatInteractsWithDb(implicit session: DBSession): Unit = {
  ...
  getResultsParallely(poolSize = 32, fetchSize = 2000000)
  ...
}

def getResultsParallely(poolSize: Int, fetchSize: Int)(implicit session: DBSession): Seq[ResultClass] = {
  import java.util.concurrent.Executors
  import scala.concurrent.ExecutionContext
  import scala.concurrent.duration._

  implicit val ec: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(poolSize))

  val resultsSequenceFuture: Seq[Future[ResultClass]] = {
    (0 until poolSize).map { i =>
      val limit: Long = fetchSize
      val offset: Long = i * fetchSize

      Future(methodThatMakesSingleQuery(limit, offset))
    }
  }
  val resultsFutureSequence: Future[Seq[ResultClass]] = Future.sequence(resultsSequenceFuture)

  Await.result(resultsFuture, 2.minutes)
}

There are 2 problems with this technique:

  1. My application is quite big and has many nested method calls, so passing implicit session through all methods like this (see below) isn't feasible.
  2. In addition to the said interactions with different DBs one-by-one, I also need a single connection to another (fixed) DB throughout the lifetime of my entire application. This connection would be used to make a small write operation (logging the progress of my interactions with other DBs) after every few minutes. Therefore, I need multiple ConnectionPools, one for each DB

From what I could make out of ScalikeJdbc's docs, I came up with following way of doing it that doesn't require me to pass the implicit session everywhere.

def createConnectionPool(poolName: String, poolSize: Int): Unit = {
  ConnectionPool.add(
    name = poolName,
    url = "myUrl",
    user = "myUser",
    password = "***",
    settings = ConnectionPoolSettings(initialSize = poolSize)
  )
}

def methodThatInteractsWithDb(poolName: String): Unit = {
  ...
  (DB(ConnectionPool.get(poolName).borrow())).readOnly { implicit session: DBSession =>
    // interact with DB
    ...
  }
  ...
}

Although this works, but I'm no longer able to parallelize the db-interaction. This behaviour is obvious since I'm using the borrow() method, that gets a single connection from the pool. This, in turn, makes me wonder why that AutoSession thing worked earlier: why was I able to fire multiple queries simultaneously using a single implicit session? And if that thing worked, then why doesn't this work? But I find no examples of how to obtain a DBSession from a ConnectionPool that supports multiple connections.


To sum up, I have 2 problems and 2 solutions: one for each problem. But I need a single (commmon) solution that solves both the problems.

ScalikeJdbc's limited docs aren't offering a lot of help and blogs / articles on ScalikeJdbc are practically non-existent. Please suggest the correct way / some work-around.


Framework versions

  • Scala 2.11.11
  • "org.scalikejdbc" %% "scalikejdbc" % "3.2.0"

Solution

  • Thanks to @Dennis Hunziker, I was able to figure out the correct way to release connections borrowed from ScalikeJdbc's ConnectionPool. It can be done as follows:

    import scalikejdbc.{ConnectionPool, using}
    import java.sql.Connection
    
    using(ConnectionPool.get("poolName").borrow()) { (connection: Connection) =>
        // use connection (only once) here
    }
    // connection automatically returned to pool
    

    With this, now I'm able to parallelize interaction with the pool.


    To solve my problem of managing several ConnectionPools and using connections across several classes, I ended up writing a ConnectionPoolManager, complete code for which can be found here. By offloading the tasks of

    • creating pools
    • borrowing connections from pools
    • removing pools

    to a singleton object that I could use anywhere across my project, I was able to clear a lot of clutter and eliminated the need pass implicit session across chain of methods.


    EDIT-1

    While I've already linked the complete code for ConnectionPoolManager, here's a quick hint of how you can go about it

    Following method of ConnectionPoolManager lets you borrow connections from ConnectionPools

    def getDB(dbName: String, poolNameOpt: Option[String] = None): DB = {
      // create a pool for db (only) if it doesn't exist
      addPool(dbName, poolNameOpt)
    
      val poolName: String = poolNameOpt.getOrElse(dbName)
      DB(ConnectionPool.get(poolName).borrow())
    }
    

    Thereafter, throughout your code, you can use the above method to borrow connections from pools and make your queries

    def makeQuery(dbName: String, poolNameOpt: Option[String]) = {
      ConnectionPoolManager.getDB(dbName, poolNameOpt).localTx { implicit session: DBSession =>
        // perform ScalikeJdbc SQL query here
      }
    }