Search code examples
multithreadingscalaapache-sparkredisjedis

Trouble using Threading with Redis (Jedis) in Scala


I have a Scala program using threading (actually Spark) and Redis (Jedis). I defined an object for my Redis operations where I have a Lazy val for the connection. I need each thread to open a connection to Redis and work with it in parallel.
The connection object:

object redisOp{
  lazy val r = new Jedis("127.0.0.1",6379,30)
  def find(u: Long): Option[Long] = Option(r.get(s"p$u")).flatMap(p => if (p.toLong == u) Some(u) else find(p.toLong))
  // and other functions
}

When I use it with one thread, it works well. But when multiple threads using it I get errors. At first, I got Unknown replay: 4 in each thread, where "4" is a random character (redis.clients.jedis.exceptions.JedisConnectionException: Unknown reply:).
Then from redis-cli I tried to set config set timeout 30000 and 30000 as I also saw a redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream and sometimes redis.clients.jedis.exceptions.JedisDataException: ERR Protocol error: invalid multibulk length in logs as well. And now in some runs(when switching to 2 threads instead 4), the program stucks in a stage forever with no errors! I checked Spark-UI to check executors' log but can't find anything useful: https://pastebin.com/iJMeBD0D

I think the problem is in defining and using connection with Redis. Also, please tell me the proper way of closing the connection if needed.


Solution

  • Jedis object is not thread-safe. You should use some sort of object/connection pooling in a multi-threaded environment. Jedis provides JedisPool for that purpose. More details can be found in Jedis Wiki.

    The basic idea is to get Jedis object by JedisPool.getResource() and return that object by Jedis.close().