Search code examples
apache-sparkapache-spark-sqlspark-streamingmetrics

How to get number of written records in spark structured streaming?


I have a few structured streams configured in a single spark session. I need to know how many records were read and written in each stream. For example, I have these two streams:

  1. read-s3 -> transform -> write-s3
  2. read-s3 -> transform -> write-db

I know about using SparkListener().onTaskEnd() but at that point I don't have a query name and taskEnd.taskMetrics().outputMetrics().recordsWritten() is always 0 so it's not an option.

Another way is to use accumulators in dataset.map() to calculate it incrementally. But then it's not a number of written records but records to be written (if sink not fail).

Besides that I tried to use StreamingQueryListener (which I use to get numInputRows) but I couldn't find any metrics regarding number of written records.

Is there a possibility to get this kind of metrics?


Solution

  • There was a bug in FileStreamSink that is fixed in version 2.3.1.

    As a workaround used accumulators in a map function to count number of records just before writing to a sink.