I have a use case where we are streaming events and for each event I have to do some lookups. The Lookups are in Redis and I am wondering what is the best way to create the connections. The spark streaming would run 40 executors and I have 5 such Streaming jobs all connecting to same Redis Cluster. So I am confused what approach should I be taking to create the Redis connection
Create a connection object on the driver and broadcast it to the executors ( Not sure if it really works as I have to make this object Serializable). Can I do this with broadcast variables?
Create a Redis connection for each partition, however I have the code written this way
val update = xyz.transform(rdd => {
// on driver
if (xyz.isNewDay) {
.....
}
rdd
})
update.foreachRDD(rdd => {
rdd.foreachPartition(partition => {
partition.foreach(Key_trans => {
// perform some lookups logic here
}
}
})
So now if i create a connection inside each partition it would mean that for every RDD and for each partition in that RDD I would be creating a new connection.
Is there a way i can maintain one connection for each partition and cache that object so that I would not have to create connections again and again?
I can add more context/info if required.
1. Create a connection object on the driver and broadcast it to the executors ( Not sure if it really works as I have to make this object Serializable). Can I do this with broadcast variables?
Answer - No. Most of the connection objects are not serializable due to machine dependent data associated with connection.
2. Is there a way i can maintain one connection for each partition and cache that object so that I would not have to create connections again and again?
Ans- Yes, create a connection pool and use it in partition. here is the style. You can create a connection pool like this https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala
and then use it
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
Please check this: design pattern for using foreachRDD