Search code examples
apache-sparkredisspark-streamingspark-streaming-kafka

How to create connection(s) to a Datasource in Spark Streaming for Lookups


I have a use case where we are streaming events and for each event I have to do some lookups. The Lookups are in Redis and I am wondering what is the best way to create the connections. The spark streaming would run 40 executors and I have 5 such Streaming jobs all connecting to same Redis Cluster. So I am confused what approach should I be taking to create the Redis connection

  1. Create a connection object on the driver and broadcast it to the executors ( Not sure if it really works as I have to make this object Serializable). Can I do this with broadcast variables?

  2. Create a Redis connection for each partition, however I have the code written this way

    val update = xyz.transform(rdd => { // on driver if (xyz.isNewDay) { ..... } rdd }) update.foreachRDD(rdd => { rdd.foreachPartition(partition => { partition.foreach(Key_trans => { // perform some lookups logic here } } })

So now if i create a connection inside each partition it would mean that for every RDD and for each partition in that RDD I would be creating a new connection.

Is there a way i can maintain one connection for each partition and cache that object so that I would not have to create connections again and again?

I can add more context/info if required.


Solution

  • 1. Create a connection object on the driver and broadcast it to the executors ( Not sure if it really works as I have to make this object Serializable). Can I do this with broadcast variables?

    Answer - No. Most of the connection objects are not serializable due to machine dependent data associated with connection.

    2. Is there a way i can maintain one connection for each partition and cache that object so that I would not have to create connections again and again?

    Ans- Yes, create a connection pool and use it in partition. here is the style. You can create a connection pool like this https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala

    and then use it

    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        // ConnectionPool is a static, lazily initialized pool of connections
        val connection = ConnectionPool.getConnection()
        partitionOfRecords.foreach(record => connection.send(record))
        ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      }
    }
    

    Please check this: design pattern for using foreachRDD