Search code examples
apache-sparkpysparkrddbinaryfilespartitioning

PySpark: Partitioning while reading a binary file using binaryFiles() function


sc = SparkContext("Local")
rdd = sc.binaryFiles(Path to the binary file , minPartitions = 5).partitionBy(8)

or

sc = SparkContext("Local")
rdd = sc.binaryFiles(Path to the binary file , minPartitions = 5).repartition(8)

Using either of the above codes, I am trying to make 8 partitions in my RDD {wherein, I want the data to be distributed evenly on all the partitions}. When I am printing {rdd.getNumPartitions()}, the number of partitions shown are 8 only, but on Spark UI, I have observed that though 8 partitions are made but the all the whole binary file data is put on only one partition.

Note: minPartition attribute is not working. Even after setting minPartitions = 5,the number of partitions made in RDD is 1 only. Thus, used partitionBy/repartition Functions.

Is this is the desired behaviour or Am I missing something?


Solution

  • Spark 2.4+, the problem should be fixed, see @Rahul's comment below this answer.

    Spark 2.1-2.3, the minPartitions argument of binaryFiles() is ignored. See Spark-16575 and the commit changes to function setMinPartitions(). Notice in the commit changes how minPartitions isn't used anymore in the function!

    If you are reading multiple binary files with binaryFiles(), the input files will be coalesced into partitions based on the following:

    • spark.files.maxPartitionBytes, default 128 MB
    • spark.files.openCostInBytes, default 4 MB
    • spark.default.parallelism
    • the total size of your input

    The first three config items are described here. See the commit change above to see the actual calculation.

    I had a scenario where I wanted a max of 40 MB per input partition, hence 40 MB per task... to increase parallelism while parsing. (Spark was putting 128 MB into each partition, slowing down my app.) I set spark.files.maxPartitionBytes to 40 M before calling binaryFiles():

    spark = SparkSession \
       .builder \
       .config("spark.files.maxPartitionBytes", 40*1024*1024)
    

    For only one input file, @user9864979's answer is correct: a single file cannot be split into multiple partitions using just binaryFiles().


    When reading multiple files with Spark 1.6, the minPartitions argument does work, and you have to use it. If you don't, you'll experience the Spark-16575 problem: all of your input files will be read into only two partitions!

    You will find that Spark will normally give you fewer input partitions than you request. I had a scenario where I wanted one input partition for every two input binary files. I found that setting minPartitions to "the # of input files * 7 / 10" gave me roughly what I wanted.
    I had another scenario where I wanted one input partition for each input file. I found that setting minPartitions to "the # of input files * 2" gave me what I wanted.

    Spark 1.5 behavior of binaryFiles(): you get one partition for each input file.