Search code examples
apache-sparkcoalescespark-structured-streaming

What is the effect of 'coalesce' before 'partitionBy' in this streaming query?


I have a streaming query (Spark Structured Streaming) that receives data from a Kafka topic (two partitions), like this:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "172.29.57.25:9092,172.29.57.30:9092")
  .option("subscribe", "mytopic")
  .load()
  .select(from_json(col("value").cast("string"), schema).as("record")).select("record.*")

I want to perform a simple aggregation and partition by date/hour, and the save to Parquet files in HDFS, like this:

val aggregationQuery = df.withColumn("ROP", from_unixtime((col("attributes.START_TIME")/1000), "yyyy-MM-dd HH:mm").cast(TimestampType))
.withColumn("date", to_date(col("ROP")))
.withColumn("hour", hour(col("ROP")))
.withColumn("timestamp", current_timestamp())
.withWatermark("timestamp", "0 minutes")
.groupBy(window(col("timestamp"), "10 seconds"), col("date"), col("hour"))
.agg(count("attributes.RECORDID").as("NumRecords"))
.coalesce(2)

Output to Parquet:

aggregationQuery.writeStream
.format("parquet")
.trigger(Trigger.ProcessingTime("10 seconds"))
.partitionBy("date", "hour")
.option("path", "hdfs://cloudera-cluster:8020/user/spark/proyecto1")
.option("checkpointLocation", "hdfs://cloudera-cluster:8020/user/spark/checkpointfolder")
.outputMode("append")
.start()

As a result, I am getting a folder structure similar to this example:

         user/spark/proyecto1/date=2015-08-18/hour=20

Inside each folder I am getting 2 Parquet files per Trigger during the streaming process.

I would like to understand what 'coalesce' and 'partitionBy' operations are doing with my data, and also any risks associated to this particular combination.

By the way, I have only 2 nodes in my cluster.


Solution

    • coalesce reduces parallelism for the complete Pipeline to 2. Since it doesn't introduce analysis barrier it propagates back, so in practice it might be better to replace it with repartition.
    • partitionBy creates a directory structure you see, with values encoded in the path. It removes corresponding columns from the leaf files. Because dates and hours have low cardinality there is no particular risk in this case.

    Combined these two create observed directory structure and limit a number of files in each leaf directory to at most two.