Is there a way I can find how many records got processed into downstream delta table for each micro-batch. I've streaming job, which runs hourly once using trigger.once() with the append mode. For audit purpose, I want to know how many records got processed for each micro batch. I've tried the below code to print the count of records processed(shown in the second line).
ss_count=0
def write_to_managed_table(micro_batch_df, batchId):
#print(f"inside foreachBatch for batch_id:{batchId}, rows in passed dataframe: {micro_batch_df.count()}")
ss_count = micro_batch_df.count()
saveloc = "TABLE_PATH"
df_final.writeStream.trigger(once=True).foreachBatch(write_to_managed_table).option('checkpointLocation', f"{saveloc}/_checkpoint").start(saveloc)
print(ss_count)
Streaming job will run without any issues but micro_batch_df.count() will not print any count.
Any pointers would be much appreciated.
Here is a working example of what you are looking for (structured_steaming_example.py):
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("StructuredStreamTesting") \
.getOrCreate()
# Create DataFrame representing the stream of input
df = spark.read.parquet("data/")
lines = spark.readStream.schema(df.schema).parquet("data/")
def batch_write(output_df, batch_id):
print("inside foreachBatch for batch_id:{0}, rows in passed dataframe: {1}".format(batch_id, output_df.count()))
save_loc = "/tmp/example"
query = (lines.writeStream.trigger(once=True)
.foreachBatch(batch_write)
.option('checkpointLocation', save_loc + "/_checkpoint")
.start(save_loc)
)
query.awaitTermination()
The sample parquet file is attached. Please put that in the data folder and execute the code using spark-submit
spark-submit --master local structured_steaming_example.py
Please put any sample parquet file under data folder for testing.