Search code examples
apache-sparkspark-streamingspark-structured-streaming

Spark Structured streaming getting incorrect number of input rows when using StreamingQueryListener QueryProgressEvent


I am facing issue in using StreamingQueryListener to identify number of input row, I am using

queryProgress.progress().numInputRows()

I get proper count when there is no other action apart from write, but the moment I add certain actions such as df.count or df.isEmpty() my number of input rows count get disrupted.

Any help is highly appreciated

EDIT

Below code works

df.writeStream().outputMode("append").foreachBatch(new VoidFunction2<Dataset<Row>,Long>(){
  @Override
  public void call(Dataset<Row> streamDataset, Long batchId) throws Exception {
    streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");
  }
}).start();

This gives wrong count

df.writeStream().outputMode("append").foreachBatch(new VoidFunction2<Dataset<Row>,Long>(){
  @Override
  public void call(Dataset<Row> streamDataset, Long batchId) throws Exception {
    streamDataset.count();                                    
    streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");
  }
}).start();

Note

Please ignore write() code, in real scenario data is being written to mysql


Solution

  • When you are defining more than just one action as in

    streamDataset.count();                                    
    streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");
    

    Spark creates two "seperate" streams that each consume same data. However, both streams are calling the onQueryProgress. This happens at the same time as those two streams are wrapped into the same foreachBatch.

    In your particular case you will therefore see twice as much data in your NumInputRows compared to the output of count.

    The factor will increase depending on the amount of actions you have.