I have a few structured streams configured in a single spark session. I need to know how many records were read and written in each stream. For example, I have these two streams:
I know about using SparkListener().onTaskEnd() but at that point I don't have a query name and taskEnd.taskMetrics().outputMetrics().recordsWritten()
is always 0 so it's not an option.
Another way is to use accumulators in dataset.map() to calculate it incrementally. But then it's not a number of written records but records to be written (if sink not fail).
Besides that I tried to use StreamingQueryListener (which I use to get numInputRows
) but I couldn't find any metrics regarding number of written records.
Is there a possibility to get this kind of metrics?
There was a bug in FileStreamSink that is fixed in version 2.3.1.
As a workaround used accumulators in a map function to count number of records just before writing to a sink.