How many files does a pyspark parquet write generate? I have read that the output is one file per in memory partition. However, this does not seem to always be true.
I am running a 6 executors cluster with 6G executor memory per executor. All the rest (pyspark, overhead, offheap) are 2G
using the following data:
dummy_data = spark.createDataFrame(pd.DataFrame({'a':np.random.choice([1,2,3,4,5,6,7,8,9,10],100000)}))
The following code where I repartition without specifying a column to repartition by, always produces the number of files equal to the number of memory partitions:
df_dummy = dummy_data.repartition(200)
df_dummy.rdd.getNumPartitions()
df_dummy.write.format("parquet").save("gs://monsoon-credittech.appspot.com/spark_datasets/test_writes/df_dummy_repart_wo_id")
#files generated 200
However, the following code, where I do specify the column to repartition the data by, produces some random number of files:
df_dummy = dummy_data.repartition(200,'a')
df_dummy.rdd.getNumPartitions()
df_dummy.write.format("parquet").save("gs://monsoon-credittech.appspot.com/spark_datasets/test_writes/df_dummy_repart_w_id")
#files generated 11
Can you help me understand the number of output files that gets generated by the pyspark parquet writer.
This is an answer that does not explain everything you're noticing, but probably contains useful enough information that it would be a pity not to share it.
The reason why you're seeing a different amount of output files is because of the order of your data after those 2 partitions.
dummy_data.repartition(200)
repartitions your individual rows using round robin partitioning
dummy_data.repartition(200,'a')
uses hash partitioning according to the column a
's values
a == 1
always in the same partitiona
value.Now, there is a pattern in the amount of output part-files you receive:
dummy_data.repartition(200)
, you simply get the same number of part-files as partitions. 200 in your example.PARQUET_OUTPUT_COMMITTER_CLASS
, or something else.In general, you get the same amount of part-files as the amount of partitions.
In your specific case, when you're repartitioning by the column (which is the only value in the Row), your parquet part-files will contain a bunch of the same values. It seems that something (I don't know what) is being smart and merging files with the same values.
In your case, you got 11 part-files because there is 1 empty file and 10 files for each distinct value in your dataframe. Try changing np.random.choice([1,2,3,4,5,6,7,8,9,10]
to np.random.choice([1,2,3,4,5,6,7,8]
and you will see you'll get 9 part-files (8 + 1).