Search code examples
apache-sparkapache-spark-sqlgroupingpartitioning

Partition a Spark DataFrame based on values in an existing column into a chosen number of partitions


I would like to partition a Spark DataFrame into an even number of partitions based on an index column before writing to a file. I would like to control how many partitions to create based on the size of the DataFrame and then use then when writing to Parquet file using partitionBy.

Having an example DataFrame:

 i     b
 0    11
 1     9
 2    13
 3     2
 4    15
 5     3
 6    14
 7    16
 8    11
 9     9
 10   17
 11   10

Assuming that I would like to create 4 partitions based on values in column i then the partitions would correspond to the values assigned to column g:

g    i     b
0    0    11
0    1     9
0    2    13
1    3     2
1    4    15
1    5     3
2    6    14
2    7    16
2    8    11
3    9     9
3   10    17
3   11    10

What is the preferred way of doing this in Spark?


Solution

  • Although the documentation seems a little hard to follow, and making some assumptions on the question - i.e. it would like 4 or rather N files(?) as output with an ascending approach on the id stated as column "i", hereby my own Spark 2.4 adapted example that takes 20 records and splits them into 4 evenly ranged partitions and then writes these out. Let's go:

    val list = sc.makeRDD((1 to 20)).map((_, 1,"2019-01-01", "2019-01-01",1,2,"XXXXXXXXXXXXXXXXXXXXXXXXXX"))
    
    val df = list.toDF("customer_id", "dummy", "report_date", "date", "value_1", "value_2", "dummy_string")
    df.show(false)
    

    Showing a few entries only:

    +-----------+-----+-----------+----------+-------+-------+--------------------------+
    |customer_id|dummy|report_date|date      |value_1|value_2|dummy_string              |
    +-----------+-----+-----------+----------+-------+-------+--------------------------+
    |1          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
    |2          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
    |3          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
    |4          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
    |5          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
    |6          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
    |7          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
    ...
    

    Then - including some extra sorting for good measure - this not necessary, working with all formats:

    df.repartitionByRange(4, $"customer_id")
      .sortWithinPartitions("customer_id", "date", "value_1")
      .write
      .parquet("/tmp/SOQ6")
    

    This gave 4 files as in the picture below:

    enter image description here

    You can see 4 files and the first and last parts naming are evident. Running:

    val lines = spark.read.parquet("/tmp/SOQ6/part-00000-tid-2518447510905190948-a81455f6-6c0b-4e02-89b0-57dfddf1fb97-1200-c000.snappy.parquet")
    val words = lines.collect
    lines.count
    

    reveals 5 records, and the content which is ordered consecutively as per the dataframe.

    lines: org.apache.spark.sql.DataFrame = [customer_id: int, dummy: int ... 5 more fields]
     words: Array[org.apache.spark.sql.Row] = Array([1,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [2,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [3,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [4,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [5,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX])
    res11: Long = 5
    

    Ran this on all the files, but only show one.

    Final comments

    Whether this is a good idea is a different story, e.g. think of non-broadcast JOINs that is an issue.

    In addition I would obviously not hard-code the 4, but apply some formula for N to to be applied to the partitionByRange! E.g.:

    val N = some calculation based on counts in DF and your cluster 
    val df2 = df.repartition(N, $"c1", $"c2")
    

    You have to test the DF Writer as the documentation is not entirely clear.

    Checked on EMR Cluster with 2M records, 4 files as well in terms of output.