Environment
Current Approach
- Using foreachRDD
to operate on data, and a connection is established for every micro-batch (RDD). This is happening every 30 seconds (Durations.seconds (30)).
kafkaStream.foreachRDD (new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> microBatch) throws Exception {
ClientConfig clientConfig = new ClientConfig();
clientConfig.addAddress("myHost:5701"); //Define connection
HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
//Do processing
}
}
The Ask: Would like to open connection once on each Spark Worker(s) (when job is submitted), instead of new connection for each micro-batch. What can be the right way to achieve this?
What you need is well explained here: https://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
in foreachPartition, the body is executed locally in executors. There you can have static client connection (each worker will be using its own static object, for ex)
I hope it helps.
Thanks,