I am facing issue in using StreamingQueryListener to identify number of input row, I am using
queryProgress.progress().numInputRows()
I get proper count when there is no other action apart from write, but the moment I add certain actions such as df.count or df.isEmpty() my number of input rows count get disrupted.
Any help is highly appreciated
EDIT
Below code works
df.writeStream().outputMode("append").foreachBatch(new VoidFunction2<Dataset<Row>,Long>(){
@Override
public void call(Dataset<Row> streamDataset, Long batchId) throws Exception {
streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");
}
}).start();
This gives wrong count
df.writeStream().outputMode("append").foreachBatch(new VoidFunction2<Dataset<Row>,Long>(){
@Override
public void call(Dataset<Row> streamDataset, Long batchId) throws Exception {
streamDataset.count();
streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");
}
}).start();
Note
Please ignore write() code, in real scenario data is being written to mysql
When you are defining more than just one action as in
streamDataset.count();
streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");
Spark creates two "seperate" streams that each consume same data. However, both streams are calling the onQueryProgress
. This happens at the same time as those two streams are wrapped into the same foreachBatch
.
In your particular case you will therefore see twice as much data in your NumInputRows
compared to the output of count
.
The factor will increase depending on the amount of actions you have.