Current Approach
- Using foreachRDD
to operate on data, and a connection is established for every micro-batch (RDD). This is happening every 30 seconds (Durations.seconds (30)).
kafkaStream.foreachRDD (new VoidFunction<JavaRDD<String>>() {
public void call(JavaRDD<String> microBatch) throws Exception {
ClientConfig clientConfig = new ClientConfig();
clientConfig.addAddress("myHost:5701"); //Define connection
HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
//Do processing
The Ask: Would like to open connection once on each Spark Worker(s) (when job is submitted), instead of new connection for each micro-batch. What can be the right way to achieve this?
What you need is well explained here: https://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
in foreachPartition, the body is executed locally in executors. There you can have static client connection (each worker will be using its own static object, for ex)
I hope it helps.