Search code examples
javaapache-sparkspark-streaminghazelcast

Initiate JDBC connection once in Spark Streaming job


Environment

  • Spark Streaming job reading from Kafka, micro-batch size 30 seconds (Durations.seconds (30))
  • In-memory storage (Hazelcast) having Reference State. This is a non-static State and is updated in real-time by Spark Workers
  • Spark Workers connected with Hazelcast

Current Approach - Using foreachRDD to operate on data, and a connection is established for every micro-batch (RDD). This is happening every 30 seconds (Durations.seconds (30)).

kafkaStream.foreachRDD (new VoidFunction<JavaRDD<String>>() {
    @Override
    public void call(JavaRDD<String> microBatch) throws Exception {
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.addAddress("myHost:5701");    //Define connection
        HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
        //Do processing
   }
}

The Ask: Would like to open connection once on each Spark Worker(s) (when job is submitted), instead of new connection for each micro-batch. What can be the right way to achieve this?


Solution

  • What you need is well explained here: https://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        // ConnectionPool is a static, lazily initialized pool of connections
        val connection = ConnectionPool.getConnection()
        partitionOfRecords.foreach(record => connection.send(record))
        ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      }
    }
    

    in foreachPartition, the body is executed locally in executors. There you can have static client connection (each worker will be using its own static object, for ex)

    I hope it helps.

    Thanks,