Search code examples
apache-spark

Number of files generated by spark


Running a simple pyspark code which is running on 1 driver (16 core) and 2 worker node (total 32 cores). I have input data worth of 1 hour data for single day ~33GB data. Input data also has a column country and number of distinct countries in the data is 968.

I'm writing the data by partitioned by - date and country.

results.write.partitionBy("date","country").format("delta").save("<path>")

The stage that is writing to target location has total 607 tasks, with 32 tasks running in parallel [ 384/607 (32 running)].

As per my understanding,

  • spark writes 1 file per partition
  • number of tasks = number of partitions
  • so number of tasks = number of partitions = number of files

Question - Here in this stage that is writing to target location (160/607 (32 running)) I have , total 607 tasks, so should not spark write only 607 files. Instead under each date+country folder, it has generated random number of files.


Solution

  • Just noticed that you're using delta format. That's a whole different ballgame than "spark".

    • If you're using DBR (Databricks Runtime): DBR provides many tunable parameters and number of files would depend on values of those parameters. E.g. delta.targetFileSize, delta.tuneFileSizesForRewrites
    • If you're using OSS (Open Source) delta.io: It is different than DBR.

    Delta is a higher-level/managed format. E.g. it keeps history, it allows data-skipping, provides OPTIMIZE to combine underlying smaller parquet files into bigger ones and much more. To achieve all this it juggles a lot of metadata files in addition to parquet files behind the scenes.

    delta format is VERY different than vanilla Spark formats like csv/parquet/etc. and in no way comparable.


    spark writes 1 file per partition

    By default: Yes.

    In general: No. Correct statement is spark writes at least 1 file per partition.

    A non-zero value for spark.sql.files.maxRecordsPerFile (Maximum number of records to write out to a single file. If this value is zero or negative, there is no limit) could lead to file count greater than partition count.

    Also note that if you're reading from some source (as against creating the dataframe programmatically, say using .repartition(N)) then certain configurations (e.g. spark.sql.files.maxPartitionBytes) might affect how many partitions are created by reader, which may not be equal to number of partitions.

    So spark.read.csv('path-to-csvs-with-10-partitions').write.csv('output') might produce more than 10 partitions in output.


    An easy way to understand relation between number of partitions and number of files you can write the dataframe with paritionBy().

    >>> df = spark.createDataFrame([(i, str(i)) for i in range(1,4)], schema='k: int, v: string')
    >>> df.show()
    +---+---+
    |  k|  v|
    +---+---+
    |  1|  1|
    |  2|  2|
    |  3|  3|
    +---+---+
    
    >>> df.write.partitionBy('k').parquet('out', mode='overwrite')
    >>> 
    

    produces:

    kash@ub$ tree ./out
    ./out
    ├── k=1
    │   └── part-00001-f9992c95-121a-4d23-9548-762eced47a5d.c000.snappy.parquet
    ├── k=2
    │   └── part-00002-f9992c95-121a-4d23-9548-762eced47a5d.c000.snappy.parquet
    ├── k=3
    │   └── part-00003-f9992c95-121a-4d23-9548-762eced47a5d.c000.snappy.parquet
    └── _SUCCESS
    
    3 directories, 4 files
    kash@ub$ 
    

    See: