I am facing issue in using StreamingQueryListener to identify number of input row, I am using
I get proper count when there is no other action apart from write, but the moment I add certain actions such as df.count or df.isEmpty() my number of input rows count get disrupted.
Any help is highly appreciated
Below code works
df.writeStream().outputMode("append").foreachBatch(new VoidFunction2<Dataset<Row>,Long>(){
public void call(Dataset<Row> streamDataset, Long batchId) throws Exception {
This gives wrong count
df.writeStream().outputMode("append").foreachBatch(new VoidFunction2<Dataset<Row>,Long>(){
public void call(Dataset<Row> streamDataset, Long batchId) throws Exception {
Please ignore write() code, in real scenario data is being written to mysql
When you are defining more than just one action as in
Spark creates two "seperate" streams that each consume same data. However, both streams are calling the onQueryProgress
. This happens at the same time as those two streams are wrapped into the same foreachBatch
In your particular case you will therefore see twice as much data in your NumInputRows
compared to the output of count
The factor will increase depending on the amount of actions you have.