In my spark kinesis streaming application I am using foreachBatch to get the streaming data and need to send it to the drools rule engine for further processing.
My requirement is, I need to accumulate all json data in a list/ruleSession and send it for rule engine for processing as a batch at the executor side.
//Scala Code Example:
val dataFrame = sparkSession.readStream
.format("kinesis")
.option("streamName", streamName)
.option("region", region)
.option("endpointUrl",endpointUrl)
.option("initialPosition", "TRIM_HORIZON")
.load()
val query = dataFrame
.selectExpr("CAST(data as STRING) as krecord")
.writeStream
.foreachBatch(function)
.start()
query.awaitTermination()
val function = (batchDF: DataFrame, batchId: Long) => {
val ruleSession = kBase.newKieSession() //Drools Rule Session, this is getting created at driver side
batchDF.foreach(row => { // This piece of code is being run in executor.
val jsonData: JSONData = jsonHandler.convertStringToJSONType(row.mkString)
ruleSession.insert(jsonData) // Getting a null pointer exception here as the ruleSession is not available in executor.
}
)
ruleHandler.processRule(ruleSession) // Again this is in the driver scope.
}
In the above code, the problem I am facing is: the function used in foreachBatch is getting executed at driver side and the code inside batchDF.foreach is getting executed at worker/executor side, and thus failing to get he ruleSession.
Is there any way to run the whole function at each executor side?
OR
Is there a better way to accumulate all the data in a batch DataFrame after transformation and send it to next process from within the executor/worker?
I think this might work ... Rather than running foreach, you could use foreachBatch or foreachPartition (or or a map version like mapPartition if you want return info). In this portion, open a connection to the drools system. From that point, iterate over the dataset within each partition (or batch) sending each to the drools system (or you might send that whole chunk to drools). In the foreachPartition / foreachBatch section, at the end, close the connect (if applicable).