Search code examples
scalaapache-sparkredisconnection-poolingjedis

Spark program stucks because of Jedis Pool


According to my previous question with Jedis and threading. I changed my code to use a JedisPool instead of Jedis. But still the program stucks as the threads increase. I tried to increase .setMaxIdle(8000) and .setMaxTotal(8000) and it fix it temporary but later in other runs it stuck again after some iterations. I guess due to lack of connection in the pool (which I close them) but it seems that the threads are not releasing the connection.

Here is the updated version of my connection:

import redis.clients.jedis.{JedisPool, JedisPoolConfig}
 
object redisOp{
  @transient lazy val log: Logger = org.apache.log4j.LogManager.getLogger("myLogger")
  def apply(set: RDD[Int]): Unit = {
    val spark = SparkConstructor()
    val sc = spark.sparkContext
    // initialize Parents and Ranks key-values
    val parents = set.map(i => ("p"+i, i.toString))
    val ranks = set.map(i => ("r"+i, 1.toString))
    sc.toRedisKV(parents) // using spark-redis packege here only, ignore it.
    sc.toRedisKV(ranks)
    log.warn("***Initialized Redis***")
 
  }
 
  val jedisConfig = new JedisPoolConfig()          // Check from here (object's values and variables)
  jedisConfig.setMaxIdle(8000)                    //TODO: a better configuration?
  jedisConfig.setMaxTotal(8000)
  lazy val pool = new JedisPool(jedisConfig, "localhost")

  def find(u: Long): Option[Long] = { // returns leader of the set containing u
    val r = pool.getResource
    val res = Option(r.get(s"p$u")).flatMap(p => if (p.toLong == u) {
      Some(u)
    } else find(p.toLong))
    r.close()                                    // closing back to pool
    res
  }
// other methods are similar to find()...
}

Solution

  • The problem is in your implementation of recursion. You're calling the next recursion stack without releasing the resource. So at some point, the latest stacks are at the scarcity of resources because the older stacks are holding their resources.

    So, release the resources before calling next recursion stack.

    E.g.

      def find(u: Long): Option[Long] = { // returns leader of the set containing u
        val r = pool.getResource
        val rget = r.get(s"p$u")
        r.close()                                    // closing back to pool
        val res = Option(rget).flatMap(p => if (p.toLong == u) {
          Some(u)
        } else find(p.toLong))
        res
      }