Search code examples
pysparkspark-streamingspark-structured-streaming

Total records processed in each micro batch spark streaming


Is there a way I can find how many records got processed into downstream delta table for each micro-batch. I've streaming job, which runs hourly once using trigger.once() with the append mode. For audit purpose, I want to know how many records got processed for each micro batch. I've tried the below code to print the count of records processed(shown in the second line).

ss_count=0 

def write_to_managed_table(micro_batch_df, batchId):
#print(f"inside foreachBatch for batch_id:{batchId}, rows in passed dataframe: {micro_batch_df.count()}")

ss_count = micro_batch_df.count()

saveloc = "TABLE_PATH"
df_final.writeStream.trigger(once=True).foreachBatch(write_to_managed_table).option('checkpointLocation', f"{saveloc}/_checkpoint").start(saveloc)

print(ss_count)

Streaming job will run without any issues but micro_batch_df.count() will not print any count.

Any pointers would be much appreciated.


Solution

  • Here is a working example of what you are looking for (structured_steaming_example.py):

    from pyspark.sql import SparkSession
    
    spark = SparkSession \
        .builder \
        .appName("StructuredStreamTesting") \
        .getOrCreate()
    
    # Create DataFrame representing the stream of input
    df = spark.read.parquet("data/")
    lines = spark.readStream.schema(df.schema).parquet("data/")
    
    
    def batch_write(output_df, batch_id):
        print("inside foreachBatch for batch_id:{0}, rows in passed dataframe: {1}".format(batch_id, output_df.count()))
    
    
    save_loc = "/tmp/example"
    query = (lines.writeStream.trigger(once=True)
             .foreachBatch(batch_write)
             .option('checkpointLocation', save_loc + "/_checkpoint")
             .start(save_loc)
             )
    query.awaitTermination()
    

    The sample parquet file is attached. Please put that in the data folder and execute the code using spark-submit

    spark-submit --master local structured_steaming_example.py
    

    Please put any sample parquet file under data folder for testing.

    enter image description here