Search code examples
apache-sparkpysparkspark-streamingspark-structured-streaming

How can write files with repartition with structured streaming's writestream?


I have a structured streaming code that reads data from Kafka and dumps to HDFS. While dumping the data, I partition the data on the basis of three columns. The problem I am facing is many small files being generated during a batch. I want to generate only one file during a batch in each partitionBy. I am not sure how can I apply repartition in this scenario as it doesn't seem to work.

        query = df.selectExpr("CAST(value as STRING)") \
                .repartition(1) \
                .writeStream.partitionBy('host', 'dt', 'h') \ ==> repartition(1) is not working here
                .format("parquet") \
                .outputMode("append") \
                .option("checkpointLocation", self.checkpoint_location) \
                .option('path', self.hdfs_path) \
                .option('failOnDataLoss', 'false') \
                .option("startingOffset", "earliest") \
                .trigger(processingTime='2 seconds').start()

I don't want to write another cleanup job that reads the data from the path, repartitions it and stores the data with the desired number of files in each partition.


Solution

  • I have run some test using repartition and it seems to work for me. I created a test Kafka topic and it has data in string format id-value. Then in streaming code I split value on - and write data with partitionBy('id') to mimic your code behavior. I am using kafka broker 0.10 and spark version 2.4.3.

    See below code:

    from pyspark.sql.functions import col, split
    df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option("subscribe", "partitionTestTopic") \
      .option("startingOffsets", "earliest") \
      .load()
    

    Use repartition(1):

    df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
      .withColumn('id', split(col('value'), '-').getItem(0)) \
      .repartition(1) \
      .writeStream.partitionBy('id') \
      .format("parquet") \
      .outputMode("append") \
      .option('path', 'test-1/data') \
      .option("checkpointLocation", "test-1/checkpoint") \
      .trigger(processingTime='20 seconds') \
      .start()
    

    Output:

    ├── id=1
    │   └── part-00000-9812bd07-3c0f-442e-a80c-5c09553f20e8.c000.snappy.parquet
    ├── id=2
    │   └── part-00000-522e99b6-3900-4702-baf7-2c55819b775c.c000.snappy.parquet
    ├── id=3
    │   └── part-00000-5ed9bef0-4941-451f-884e-8e94a351323f.c000.snappy.parquet
    

    Use repartition(3):

    df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
      .withColumn('id', split(col('value'), '-').getItem(0)) \
      .repartition(3) \
      .writeStream.partitionBy('id') \
      .format("parquet") \
      .outputMode("append") \
      .option('path', 'test-3/data') \
      .option("checkpointLocation", "test-3/checkpoint") \
      .trigger(processingTime='20 seconds') \
      .start()
    

    Output:

    ├── id=1
    │   ├── part-00000-ed6ed5dd-b376-40a2-9920-d0cb36c7120f.c000.snappy.parquet
    │   ├── part-00001-fa64e597-a4e1-4ac2-967f-5ea8aae96c13.c000.snappy.parquet
    │   └── part-00002-0e0feab8-57d8-4bd2-a94f-0206ff90f16e.c000.snappy.parquet
    ├── id=2
    │   ├── part-00000-c417dac5-271f-4356-b577-ff6c9f45792e.c000.snappy.parquet
    │   ├── part-00001-7c90eb8a-986a-4602-a386-50f8d6d85e77.c000.snappy.parquet
    │   └── part-00002-0e59e779-84e8-4fcf-ad62-ef3f4dbaccd5.c000.snappy.parquet
    ├── id=3
    │   ├── part-00000-8a555649-1141-42fe-9cb5-0acf0efc5997.c000.snappy.parquet
    │   ├── part-00001-ce4aaa50-e41b-4f5f-837c-661459b747b8.c000.snappy.parquet
    │   └── part-00002-9f95261e-bd4c-4f1e-bce2-f8ab3b8b01ec.c000.snappy.parquet
    

    You mentioned that you are using a 10-minute batch so you should accordingly update you trigger interval as well. Right now it's 2 seconds but it should be trigger(processingTime='10 minutes')(Refer class ProcessingTime(Trigger) here ). This could be why you are getting too many small files.

    If you use reaprtition(1) with 10 minute batch there will be a lot of shuffling of data and only one core(per host, dt and h) will end up writing all your data and you won't be able to use spark parallelism to it's extent. Also one more downside is bigger partitions than 128MB. As you are not doing any aggregation in your query you should decrease your batch size and use repartition(1) such that you can have an acceptable partition size.