Running a simple pyspark code which is running on 1 driver (16 core) and 2 worker node (total 32 cores). I have input data worth of 1 hour data for single day ~33GB data. Input data also has a column country and number of distinct countries in the data is 968.
I'm writing the data by partitioned by - date and country.
results.write.partitionBy("date","country").format("delta").save("<path>")
The stage that is writing to target location has total 607 tasks, with 32 tasks running in parallel [ 384/607 (32 running)].
As per my understanding,
Question - Here in this stage that is writing to target location (160/607 (32 running)) I have , total 607 tasks, so should not spark write only 607 files. Instead under each date+country folder, it has generated random number of files.
Just noticed that you're using delta
format. That's a whole different ballgame than "spark".
delta.targetFileSize
, delta.tuneFileSizesForRewrites
Delta is a higher-level/managed format. E.g. it keeps history, it allows data-skipping, provides OPTIMIZE
to combine underlying smaller parquet files into bigger ones and much more. To achieve all this it juggles a lot of metadata files in addition to parquet files behind the scenes.
delta
format is VERY different than vanilla Spark formats like csv/parquet/etc. and in no way comparable.
spark writes 1 file per partition
By default: Yes.
In general: No. Correct statement is spark writes at least 1 file per partition.
A non-zero value for spark.sql.files.maxRecordsPerFile
(Maximum number of records to write out to a single file. If this value is zero or negative, there is no limit) could lead to file count greater than partition count.
Also note that if you're reading from some source (as against creating the dataframe programmatically, say using .repartition(N)
) then certain configurations (e.g. spark.sql.files.maxPartitionBytes
) might affect how many partitions are created by reader, which may not be equal to number of partitions.
So spark.read.csv('path-to-csvs-with-10-partitions').write.csv('output')
might produce more than 10 partitions in output
.
An easy way to understand relation between number of partitions and number of files you can write the dataframe with paritionBy()
.
>>> df = spark.createDataFrame([(i, str(i)) for i in range(1,4)], schema='k: int, v: string')
>>> df.show()
+---+---+
| k| v|
+---+---+
| 1| 1|
| 2| 2|
| 3| 3|
+---+---+
>>> df.write.partitionBy('k').parquet('out', mode='overwrite')
>>>
produces:
kash@ub$ tree ./out
./out
├── k=1
│ └── part-00001-f9992c95-121a-4d23-9548-762eced47a5d.c000.snappy.parquet
├── k=2
│ └── part-00002-f9992c95-121a-4d23-9548-762eced47a5d.c000.snappy.parquet
├── k=3
│ └── part-00003-f9992c95-121a-4d23-9548-762eced47a5d.c000.snappy.parquet
└── _SUCCESS
3 directories, 4 files
kash@ub$
See: