I have a Scala program using threading (actually Spark) and Redis (Jedis). I defined an object
for my Redis operations where I have a Lazy val
for the connection. I need each thread to open a connection to Redis and work with it in parallel.
The connection object:
object redisOp{
lazy val r = new Jedis("127.0.0.1",6379,30)
def find(u: Long): Option[Long] = Option(r.get(s"p$u")).flatMap(p => if (p.toLong == u) Some(u) else find(p.toLong))
// and other functions
}
When I use it with one thread, it works well. But when multiple threads using it I get errors. At first, I got Unknown replay: 4
in each thread, where "4" is a random character (redis.clients.jedis.exceptions.JedisConnectionException: Unknown reply:
).
Then from redis-cli
I tried to set config set timeout 30000
and 30000
as I also saw a redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream
and sometimes redis.clients.jedis.exceptions.JedisDataException: ERR Protocol error: invalid multibulk length
in logs as well. And now in some runs(when switching to 2 threads instead 4), the program stucks in a stage forever with no errors! I checked Spark-UI to check executors' log but can't find anything useful: https://pastebin.com/iJMeBD0D
I think the problem is in defining and using connection with Redis. Also, please tell me the proper way of closing the connection if needed.
Jedis object is not thread-safe. You should use some sort of object/connection pooling in a multi-threaded environment. Jedis provides JedisPool for that purpose. More details can be found in Jedis Wiki.
The basic idea is to get Jedis object by JedisPool.getResource()
and return that object by Jedis.close()
.