Search code examples
apache-sparkparquetspark-structured-streaming

Structured streaming performance and purging the parquet files


I am using Spark structured streaming to get streaming data from Kafka. I need to aggregate various metrics (Say 6 metrics) and write as parquet files. I do see that there is a huge delay between metric 1 and metric 2. For example, if metric 1 is updated recently, metric 2 is one hour old data. How do I improve this performance to work in parallel?

Also, I write Parquet files which should be read by another application. How do I purge old parquet information constantly? Should I have a different application for it?

Dataset<String> lines_topic = spark.readStream().format("kafka").option("kafka.bootstrap.servers", bootstrapServers) 
Dataset<Row> data= lines_topic.select(functions.from_json(lines_topic.col("value"), schema).alias(topics)); data.withWatermark(---).groupBy(----).count(); query = data.writeStream().format("parquet").option("path",---).option("truncate", "false").outputMode("append").option("checkpointLocation", checkpointFile).start();

Solution

  • Since each query is running independently from the others you need to ensure you're giving each query enough resources to execute. What could be happening is if you're using the default FIFO scheduler then all triggers are running sequentially vs in parallel.

    Just as described here you should set a FAIR scheduler on your SparkContext and then define new pools for each query.

    // Run streaming query1 in scheduler pool1
    spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
    df.writeStream.queryName("query1").format("parquet").start(path1)
    
    // Run streaming query2 in scheduler pool2
    spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
    df.writeStream.queryName("query2").format("orc").start(path2)
    

    Also, in terms of purging old parquet files you may want to partition the data and then periodically delete old partitions as needed. Otherwise you can't just delete rows if all the data is being written to the same output path.