Search code examples
apache-sparkhadooppysparkapache-spark-sqlparquet

Number of files saved by parquet writer in pyspark


How many files does a pyspark parquet write generate? I have read that the output is one file per in memory partition. However, this does not seem to always be true.

I am running a 6 executors cluster with 6G executor memory per executor. All the rest (pyspark, overhead, offheap) are 2G

using the following data:

dummy_data = spark.createDataFrame(pd.DataFrame({'a':np.random.choice([1,2,3,4,5,6,7,8,9,10],100000)}))

The following code where I repartition without specifying a column to repartition by, always produces the number of files equal to the number of memory partitions:

df_dummy = dummy_data.repartition(200)
df_dummy.rdd.getNumPartitions()
df_dummy.write.format("parquet").save("gs://monsoon-credittech.appspot.com/spark_datasets/test_writes/df_dummy_repart_wo_id")

#files generated 200

However, the following code, where I do specify the column to repartition the data by, produces some random number of files:

df_dummy = dummy_data.repartition(200,'a')
df_dummy.rdd.getNumPartitions()
df_dummy.write.format("parquet").save("gs://monsoon-credittech.appspot.com/spark_datasets/test_writes/df_dummy_repart_w_id")

#files generated 11

Can you help me understand the number of output files that gets generated by the pyspark parquet writer.


Solution

  • This is an answer that does not explain everything you're noticing, but probably contains useful enough information that it would be a pity not to share it.

    The reason why you're seeing a different amount of output files is because of the order of your data after those 2 partitions.

    • dummy_data.repartition(200) repartitions your individual rows using round robin partitioning
      • the result is that your data has a random ordering, because your input data has random ordering
    • dummy_data.repartition(200,'a') uses hash partitioning according to the column a's values
      • the result is that your data is chopped up in a very specific order: hashing the column values will put values where a == 1 always in the same partition
      • since your nr of partitions is smaller than the distinct amount of possible values, each partition will contain only 1 distinct a value.

    Now, there is a pattern in the amount of output part-files you receive:

    • In the case of dummy_data.repartition(200), you simply get the same number of part-files as partitions. 200 in your example.
    • In the other case, you get 11 part-files. If you have a look at the content of those part-files, you will see that there is 1 empty file + 10 filled files. 1 for each distinct value of your original dataset. So this leads to the conclusion that while writing your files, something is being smart and merging those minuscule and identical files. I'm not sure whether this is Spark, or the PARQUET_OUTPUT_COMMITTER_CLASS, or something else.

    Conclusion

    In general, you get the same amount of part-files as the amount of partitions.

    In your specific case, when you're repartitioning by the column (which is the only value in the Row), your parquet part-files will contain a bunch of the same values. It seems that something (I don't know what) is being smart and merging files with the same values.

    In your case, you got 11 part-files because there is 1 empty file and 10 files for each distinct value in your dataframe. Try changing np.random.choice([1,2,3,4,5,6,7,8,9,10] to np.random.choice([1,2,3,4,5,6,7,8] and you will see you'll get 9 part-files (8 + 1).